diff --git a/nifi-api/src/main/java/org/apache/nifi/documentation/init/NopComponentLog.java b/nifi-api/src/main/java/org/apache/nifi/documentation/init/NopComponentLog.java index 50ef5ad254..0ecb3d93de 100644 --- a/nifi-api/src/main/java/org/apache/nifi/documentation/init/NopComponentLog.java +++ b/nifi-api/src/main/java/org/apache/nifi/documentation/init/NopComponentLog.java @@ -18,6 +18,7 @@ package org.apache.nifi.documentation.init; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogMessage; public class NopComponentLog implements ComponentLog { @Override @@ -40,6 +41,11 @@ public class NopComponentLog implements ComponentLog { } + @Override + public void warn(LogMessage logMessage) { + + } + @Override public void trace(final String msg, final Throwable t) { @@ -60,6 +66,11 @@ public class NopComponentLog implements ComponentLog { } + @Override + public void trace(LogMessage logMessage) { + + } + @Override public boolean isWarnEnabled() { return false; @@ -105,6 +116,11 @@ public class NopComponentLog implements ComponentLog { } + @Override + public void info(LogMessage message) { + + } + @Override public String getName() { return null; @@ -130,6 +146,11 @@ public class NopComponentLog implements ComponentLog { } + @Override + public void error(LogMessage message) { + + } + @Override public void debug(final String msg, final Throwable t) { @@ -150,6 +171,11 @@ public class NopComponentLog implements ComponentLog { } + @Override + public void debug(LogMessage message) { + + } + @Override public void log(final LogLevel level, final String msg, final Throwable t) { @@ -169,4 +195,10 @@ public class NopComponentLog implements ComponentLog { public void log(final LogLevel level, final String msg, final Object[] os, final Throwable t) { } + + @Override + public void log(LogMessage message) { + + } + } diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java index f3960176e6..9cdd812a3d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -34,7 +34,7 @@ public interface FlowFile extends Comparable { * should not be used for true universal unique type needs. For that consider * using the attribute found in the flow file's attribute map keyed by * {@link org.apache.nifi.flowfile.attributes.CoreAttributes.UUID CoreAttributes.UUID}. - * For example, by calling getAttribute(CoreAttributes.UUID.getKey()). + * For example, by calling getAttribute(CoreAttributes.UUID.key()). */ long getId(); diff --git a/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java b/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java index 819cd6ac05..30b6b510e8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java +++ b/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java @@ -46,7 +46,6 @@ package org.apache.nifi.logging; * Managers to understand that a problem exists and what the issue is. * * - * */ public interface ComponentLog { @@ -58,6 +57,10 @@ public interface ComponentLog { void warn(String msg); + default void warn(LogMessage logMessage) { + log(LogLevel.WARN, logMessage); + } + void trace(String msg, Throwable t); void trace(String msg, Object... os); @@ -66,6 +69,10 @@ public interface ComponentLog { void trace(String msg, Object[] os, Throwable t); + default void trace(LogMessage logMessage) { + log(LogLevel.TRACE, logMessage); + } + boolean isWarnEnabled(); boolean isTraceEnabled(); @@ -84,6 +91,10 @@ public interface ComponentLog { void info(String msg, Object[] os, Throwable t); + default void info(LogMessage logMessage) { + log(LogLevel.INFO, logMessage); + } + String getName(); void error(String msg, Throwable t); @@ -94,6 +105,10 @@ public interface ComponentLog { void error(String msg, Object[] os, Throwable t); + default void error(LogMessage logMessage) { + log(LogLevel.ERROR, logMessage); + } + void debug(String msg, Throwable t); void debug(String msg, Object... os); @@ -102,11 +117,128 @@ public interface ComponentLog { void debug(String msg); - void log(LogLevel level, String msg, Throwable t); + default void debug(LogMessage logMessage) { + log(LogLevel.ERROR, logMessage); + } - void log(LogLevel level, String msg, Object... os); + default void log(LogLevel level, String msg, Throwable t) { + switch (level) { + case DEBUG: + debug(msg, t); + break; + case ERROR: + case FATAL: + error(msg, t); + break; + case INFO: + info(msg, t); + break; + case TRACE: + trace(msg, t); + break; + case WARN: + warn(msg, t); + break; + } + } - void log(LogLevel level, String msg); + default void log(LogLevel level, String msg, Object... os) { + switch (level) { + case DEBUG: + debug(msg, os); + break; + case ERROR: + case FATAL: + error(msg, os); + break; + case INFO: + info(msg, os); + break; + case TRACE: + trace(msg, os); + break; + case WARN: + warn(msg, os); + break; + } + } - void log(LogLevel level, String msg, Object[] os, Throwable t); + default void log(LogLevel level, String msg) { + switch (level) { + case DEBUG: + debug(msg); + break; + case ERROR: + case FATAL: + error(msg); + break; + case INFO: + info(msg); + break; + case TRACE: + trace(msg); + break; + case WARN: + warn(msg); + break; + } + } + + default void log(LogLevel level, String msg, Object[] os, Throwable t) { + switch (level) { + case DEBUG: + debug(msg, os, t); + break; + case ERROR: + case FATAL: + error(msg, os, t); + break; + case INFO: + info(msg, os, t); + break; + case TRACE: + trace(msg, os, t); + break; + case WARN: + warn(msg, os, t); + break; + } + } + + default void log(LogMessage message) { + switch (message.getLogLevel()) { + case DEBUG: + debug(message); + break; + case ERROR: + case FATAL: + error(message); + break; + case INFO: + info(message); + break; + case TRACE: + trace(message); + break; + case WARN: + warn(message); + break; + } + } + + default void log(LogLevel level, LogMessage logMessage) { + String msg = logMessage.getMessage(); + Throwable t = logMessage.getThrowable(); + Object[] os = logMessage.getObjects(); + + if (os != null && t != null) { + log(level, msg, os, t); + } else if (os != null) { + log(level, msg, os); + } else if (t != null) { + log(level, msg, t); + } else { + log(level, msg); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogMessage.java b/nifi-api/src/main/java/org/apache/nifi/logging/LogMessage.java similarity index 52% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogMessage.java rename to nifi-api/src/main/java/org/apache/nifi/logging/LogMessage.java index 2ac89deaaf..664bc60a13 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogMessage.java +++ b/nifi-api/src/main/java/org/apache/nifi/logging/LogMessage.java @@ -25,22 +25,65 @@ import java.util.Locale; public class LogMessage { - private final String message; - private final LogLevel level; - private final Throwable throwable; private final long time; + private final String message; + private final LogLevel logLevel; + private final Throwable throwable; + private final String flowFileUuid; + private final Object[] objects; public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; public static final String TO_STRING_FORMAT = "%1$s %2$s - %3$s"; - public LogMessage(final long millisSinceEpoch, final LogLevel level, final String message, final Throwable throwable) { - this.level = level; - this.throwable = throwable; - this.message = message; - this.time = millisSinceEpoch; + public static class Builder { + + private final long time; + private final LogLevel logLevel; + private String message; + private Throwable throwable; + private String flowFileUuid; + private Object[] objects; + + public Builder(final long time, final LogLevel logLevel) { + this.time = time; + this.logLevel = logLevel; + } + + public Builder message(String message) { + this.message = message; + return this; + } + + public Builder throwable(Throwable throwable) { + this.throwable = throwable; + return this; + } + + public Builder flowFileUuid(String flowFileUuid) { + this.flowFileUuid = flowFileUuid; + return this; + } + + public Builder objects(Object[] objects) { + this.objects = objects; + return this; + } + + public LogMessage createLogMessage() { + return new LogMessage(time, logLevel, message, throwable, flowFileUuid, objects); + } } - public long getMillisSinceEpoch() { + private LogMessage(final long time, final LogLevel logLevel, final String message, final Throwable throwable, final String flowFileUuid, final Object[] objects) { + this.logLevel = logLevel; + this.throwable = throwable; + this.message = message; + this.time = time; + this.flowFileUuid = flowFileUuid; + this.objects = objects; + } + + public long getTime() { return time; } @@ -48,25 +91,33 @@ public class LogMessage { return message; } - public LogLevel getLevel() { - return level; + public LogLevel getLogLevel() { + return logLevel; } public Throwable getThrowable() { return throwable; } + public String getFlowFileUuid() { + return flowFileUuid; + } + + public Object[] getObjects() { + return objects; + } + @Override public String toString() { final DateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT, Locale.US); final String formattedTime = dateFormat.format(new Date(time)); - String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, level.toString(), message); + String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, logLevel.toString(), message); if (throwable != null) { final StringWriter sw = new StringWriter(); final PrintWriter pw = new PrintWriter(sw); throwable.printStackTrace(pw); - formattedMsg += "\n" + sw.toString(); + formattedMsg += System.lineSeparator() + sw; } return formattedMsg; diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java index 3f160aae6b..e00ad3b5a0 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java @@ -37,6 +37,7 @@ public abstract class Bulletin implements Comparable { private String sourceId; private String sourceName; private ComponentType sourceType; + private String flowFileUuid; protected Bulletin(final long id) { this.timestamp = new Date(); @@ -131,6 +132,14 @@ public abstract class Bulletin implements Comparable { this.sourceType = sourceType; } + public String getFlowFileUuid() { + return flowFileUuid; + } + + public void setFlowFileUuid(String flowFileUuid) { + this.flowFileUuid = flowFileUuid; + } + @Override public String toString() { return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}'; diff --git a/nifi-mock/src/main/java/org/apache/nifi/reporting/BulletinFactory.java b/nifi-mock/src/main/java/org/apache/nifi/reporting/BulletinFactory.java index ca1ef6c3e0..ea6dbfda6f 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/reporting/BulletinFactory.java +++ b/nifi-mock/src/main/java/org/apache/nifi/reporting/BulletinFactory.java @@ -1,71 +1,97 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting; - -import java.util.concurrent.atomic.AtomicLong; - -public class BulletinFactory { - - private static final AtomicLong currentId = new AtomicLong(0); - - public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) { - final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); - bulletin.setGroupId(groupId); - bulletin.setSourceId(sourceId); - bulletin.setSourceName(sourceName); - bulletin.setCategory(category); - bulletin.setLevel(severity); - bulletin.setMessage(message); - return bulletin; - } - - public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final String sourceName, - final String category, final String severity, final String message) { - final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); - bulletin.setGroupId(groupId); - bulletin.setGroupName(groupName); - bulletin.setSourceId(sourceId); - bulletin.setSourceName(sourceName); - bulletin.setCategory(category); - bulletin.setLevel(severity); - bulletin.setMessage(message); - return bulletin; - } - - public static Bulletin createBulletin(final String category, final String severity, final String message) { - final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); - bulletin.setCategory(category); - bulletin.setLevel(severity); - bulletin.setMessage(message); - return bulletin; - } - - public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType, - final String sourceName, final String category, final String severity, final String message, final String groupPath) { - final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); - bulletin.setGroupId(groupId); - bulletin.setGroupName(groupName); - bulletin.setGroupPath(groupPath); - bulletin.setSourceId(sourceId); - bulletin.setSourceType(sourceType); - bulletin.setSourceName(sourceName); - bulletin.setCategory(category); - bulletin.setLevel(severity); - bulletin.setMessage(message); - return bulletin; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import java.util.concurrent.atomic.AtomicLong; + +public class BulletinFactory { + + private static final AtomicLong currentId = new AtomicLong(0); + + public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) { + final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); + bulletin.setGroupId(groupId); + bulletin.setSourceId(sourceId); + bulletin.setSourceName(sourceName); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + return bulletin; + } + + public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final String sourceName, + final String category, final String severity, final String message) { + final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); + bulletin.setGroupId(groupId); + bulletin.setGroupName(groupName); + bulletin.setSourceId(sourceId); + bulletin.setSourceName(sourceName); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + return bulletin; + } + + public static Bulletin createBulletin(final String category, final String severity, final String message) { + final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + return bulletin; + } + + public static Bulletin createBulletin(final String category, final String severity, final String message, final String flowFileUUID) { + final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + bulletin.setFlowFileUuid(flowFileUUID); + return bulletin; + } + + public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType, + final String sourceName, final String category, final String severity, final String message, final String groupPath) { + final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); + bulletin.setGroupId(groupId); + bulletin.setGroupName(groupName); + bulletin.setGroupPath(groupPath); + bulletin.setSourceId(sourceId); + bulletin.setSourceType(sourceType); + bulletin.setSourceName(sourceName); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + return bulletin; + } + + public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType, + final String sourceName, final String category, final String severity, final String message, + final String groupPath, final String flowFileUUID) { + final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement()); + bulletin.setGroupId(groupId); + bulletin.setGroupName(groupName); + bulletin.setGroupPath(groupPath); + bulletin.setSourceId(sourceId); + bulletin.setSourceType(sourceType); + bulletin.setSourceName(sourceName); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + bulletin.setFlowFileUuid(flowFileUUID); + return bulletin; + } +} diff --git a/nifi-mock/src/main/java/org/apache/nifi/reporting/MockBulletin.java b/nifi-mock/src/main/java/org/apache/nifi/reporting/MockBulletin.java index 652fd5112c..acb2e4518d 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/reporting/MockBulletin.java +++ b/nifi-mock/src/main/java/org/apache/nifi/reporting/MockBulletin.java @@ -1,24 +1,24 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting; - -public class MockBulletin extends Bulletin { - - protected MockBulletin(long id) { - super(id); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +public class MockBulletin extends Bulletin { + + protected MockBulletin(long id) { + super(id); + } +} diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockComponentLog.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockComponentLog.java index 4bb655ea68..ebdfcabf88 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockComponentLog.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockComponentLog.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.util; -import java.util.List; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.LogLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class MockComponentLog implements ComponentLog { private final CapturingLogger logger; @@ -74,9 +74,7 @@ public class MockComponentLog implements ComponentLog { private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) { final Object[] modifiedArgs = new Object[os.length + 2]; modifiedArgs[0] = component.toString(); - for (int i = 0; i < os.length; i++) { - modifiedArgs[i + 1] = os[i]; - } + System.arraycopy(os, 0, modifiedArgs, 1, os.length); modifiedArgs[modifiedArgs.length - 1] = t.toString(); return modifiedArgs; @@ -89,9 +87,7 @@ public class MockComponentLog implements ComponentLog { final Object[] modifiedArgs = new Object[os.length + 3]; modifiedArgs[0] = component.toString(); - for (int i = 0; i < os.length; i++) { - modifiedArgs[i + 1] = os[i]; - } + System.arraycopy(os, 0, modifiedArgs, 1, os.length); modifiedArgs[modifiedArgs.length - 2] = t.toString(); modifiedArgs[modifiedArgs.length - 1] = t; @@ -322,93 +318,4 @@ public class MockComponentLog implements ComponentLog { logger.debug(msg, os); } - - @Override - public void log(LogLevel level, String msg, Throwable t) { - switch (level) { - case DEBUG: - debug(msg, t); - break; - case ERROR: - case FATAL: - error(msg, t); - break; - case INFO: - info(msg, t); - break; - case TRACE: - trace(msg, t); - break; - case WARN: - warn(msg, t); - break; - } - } - - @Override - public void log(LogLevel level, String msg, Object[] os) { - switch (level) { - case DEBUG: - debug(msg, os); - break; - case ERROR: - case FATAL: - error(msg, os); - break; - case INFO: - info(msg, os); - break; - case TRACE: - trace(msg, os); - break; - case WARN: - warn(msg, os); - break; - } - } - - @Override - public void log(LogLevel level, String msg) { - switch (level) { - case DEBUG: - debug(msg); - break; - case ERROR: - case FATAL: - error(msg); - break; - case INFO: - info(msg); - break; - case TRACE: - trace(msg); - break; - case WARN: - warn(msg); - break; - } - } - - @Override - public void log(LogLevel level, String msg, Object[] os, Throwable t) { - switch (level) { - case DEBUG: - debug(msg, os, t); - break; - case ERROR: - case FATAL: - error(msg, os, t); - break; - case INFO: - info(msg, os, t); - break; - case TRACE: - trace(msg, os, t); - break; - case WARN: - warn(msg, os, t); - break; - } - } - } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java index 5a0fd846b1..5185b9802c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java @@ -100,11 +100,7 @@ public class MockReportingContext extends MockControllerServiceLookup implements @Override public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) { final Bulletin bulletin = BulletinFactory.createBulletin(null, null, componentId, "test processor", category, severity.name(), message); - List bulletins = componentBulletinsCreated.get(componentId); - if (bulletins == null) { - bulletins = new ArrayList<>(); - componentBulletinsCreated.put(componentId, bulletins); - } + List bulletins = componentBulletinsCreated.computeIfAbsent(componentId, k -> new ArrayList<>()); bulletins.add(bulletin); return bulletin; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java index b245904a6f..18be3fe594 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java @@ -38,7 +38,7 @@ public class ControllerServiceLogObserver implements LogObserver { public void onLogMessage(final LogMessage message) { // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are). - final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString(); + final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString(); final ProcessGroup pg = serviceNode.getProcessGroup(); final String groupId = pg == null ? null : pg.getIdentifier(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ProcessorLogObserver.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ProcessorLogObserver.java index 62b90d6342..14d5d285f1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ProcessorLogObserver.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ProcessorLogObserver.java @@ -40,9 +40,8 @@ public class ProcessorLogObserver implements LogObserver { public void onLogMessage(final LogMessage message) { // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are). - final String bulletinLevel = (message.getLevel() == LogLevel.WARN) ? Severity.WARNING.name() : message.getLevel().toString(); - - bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage())); + final String bulletinLevel = (message.getLogLevel() == LogLevel.WARN) ? Severity.WARNING.name() : message.getLogLevel().toString(); + bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid())); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java index f52bc1ccd6..9328f1e167 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java @@ -36,7 +36,7 @@ public class ReportingTaskLogObserver implements LogObserver { public void onLogMessage(final LogMessage message) { // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are). - final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString(); + final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString(); final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK, taskNode.getName(), "Log Message", bulletinLevel, message.getMessage()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/NopLogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/NopLogRepository.java index 9ad7993c00..fa8dd8222b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/NopLogRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/NopLogRepository.java @@ -19,6 +19,7 @@ package org.apache.nifi.logging.repository; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogMessage; import org.apache.nifi.logging.LogObserver; import org.apache.nifi.logging.LogRepository; @@ -30,11 +31,7 @@ public class NopLogRepository implements LogRepository { private volatile ComponentLog logger; @Override - public void addLogMessage(final LogLevel level, final String message) { - } - - @Override - public void addLogMessage(final LogLevel level, final String message, final Throwable t) { + public void addLogMessage(LogMessage logMessage) { } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java index 687cfec236..d2859f6381 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.logging.repository; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogMessage; @@ -27,14 +29,16 @@ import org.slf4j.helpers.MessageFormatter; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumMap; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class StandardLogRepository implements LogRepository { - private final Map> observers = new HashMap<>(); + private final Map> observers = new EnumMap<>(LogLevel.class); private final Map observerLookup = new HashMap<>(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -46,43 +50,64 @@ public class StandardLogRepository implements LogRepository { private volatile ComponentLog componentLogger; @Override - public void addLogMessage(final LogLevel level, final String message) { - addLogMessage(level, message, (Throwable) null); - } + public void addLogMessage(LogMessage logMessage) { + LogLevel logLevel = logMessage.getLogLevel(); - @Override - public void addLogMessage(final LogLevel level, final String message, final Throwable t) { - final LogMessage logMessage = new LogMessage(System.currentTimeMillis(), level, message, t); - - final Collection logObservers = observers.get(level); + final Collection logObservers = observers.get(logLevel); if (logObservers != null) { for (LogObserver observer : logObservers) { try { observer.onLogMessage(logMessage); - } catch (final Throwable observerThrowable) { + } catch (final Exception observerThrowable) { logger.error("Failed to pass log message to Observer {} due to {}", observer, observerThrowable.toString()); } } } + } @Override public void addLogMessage(final LogLevel level, final String format, final Object[] params) { replaceThrowablesWithMessage(params); + final Optional flowFileUuid = getFirstFlowFileUuidFromObjects(params); final String formattedMessage = MessageFormatter.arrayFormat(format, params).getMessage(); - addLogMessage(level, formattedMessage); + final LogMessage logMessage = new LogMessage.Builder(System.currentTimeMillis(), level) + .message(formattedMessage) + .flowFileUuid(flowFileUuid.orElse(null)) + .createLogMessage(); + addLogMessage(logMessage); } @Override public void addLogMessage(final LogLevel level, final String format, final Object[] params, final Throwable t) { replaceThrowablesWithMessage(params); + final Optional flowFileUuid = getFirstFlowFileUuidFromObjects(params); final String formattedMessage = MessageFormatter.arrayFormat(format, params, t).getMessage(); - addLogMessage(level, formattedMessage, t); + final LogMessage logMessage = new LogMessage.Builder(System.currentTimeMillis(), level) + .message(formattedMessage) + .throwable(t) + .flowFileUuid(flowFileUuid.orElse(null)) + .createLogMessage(); + addLogMessage(logMessage); + } + + private Optional getFirstFlowFileUuidFromObjects(Object[] params) { + int flowFileCount = 0; + FlowFile flowFileFound = null; + for (final Object param : params) { + if (param instanceof FlowFile) { + if (++flowFileCount > 1) { + return Optional.empty(); + } + flowFileFound = (FlowFile) param; + } + } + return Optional.ofNullable(flowFileFound).map(ff -> ff.getAttribute(CoreAttributes.UUID.key())); } private void replaceThrowablesWithMessage(final Object[] params) { for (int i = 0; i < params.length; i++) { - if(params[i] instanceof Throwable) { + if (params[i] instanceof Throwable) { params[i] = ((Throwable) params[i]).getLocalizedMessage(); } } @@ -139,11 +164,7 @@ public class StandardLogRepository implements LogRepository { for (int i = minimumLevel.ordinal(); i < allLevels.length; i++) { // no need to register an observer for NONE since that level will never be logged to by a component if (i != LogLevel.NONE.ordinal()) { - Collection collection = observers.get(allLevels[i]); - if (collection == null) { - collection = new ArrayList<>(); - observers.put(allLevels[i], collection); - } + Collection collection = observers.computeIfAbsent(allLevels[i], k -> new ArrayList<>()); collection.add(observer); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java index 85447d1e00..f3151a0cc2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java @@ -19,6 +19,7 @@ package org.apache.nifi.processor; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogMessage; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; import org.slf4j.Logger; @@ -113,6 +114,14 @@ public class SimpleProcessLogger implements ComponentLog { logRepository.addLogMessage(LogLevel.WARN, msg, os); } + @Override + public void warn(LogMessage logMessage) { + if (isWarnEnabled()) { + log(LogLevel.WARN, logMessage); + logRepository.addLogMessage(logMessage); + } + } + @Override public void trace(String msg, Throwable t) { if (!isTraceEnabled()) { @@ -162,6 +171,14 @@ public class SimpleProcessLogger implements ComponentLog { logRepository.addLogMessage(LogLevel.TRACE, msg, os, t); } + @Override + public void trace(LogMessage logMessage) { + if (isTraceEnabled()) { + log(LogLevel.TRACE, logMessage); + logRepository.addLogMessage(logMessage); + } + } + @Override public boolean isTraceEnabled() { return logger.isTraceEnabled(); @@ -242,6 +259,14 @@ public class SimpleProcessLogger implements ComponentLog { logRepository.addLogMessage(LogLevel.INFO, msg, os, t); } + @Override + public void info(LogMessage logMessage) { + if (isInfoEnabled()) { + log(LogLevel.INFO, logMessage); + logRepository.addLogMessage(logMessage); + } + } + @Override public String getName() { return logger.getName(); @@ -300,6 +325,14 @@ public class SimpleProcessLogger implements ComponentLog { logRepository.addLogMessage(LogLevel.ERROR, msg, os, t); } + @Override + public void error(LogMessage logMessage) { + if (isErrorEnabled()) { + log(LogLevel.ERROR, logMessage); + logRepository.addLogMessage(logMessage); + } + } + private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) { final Object[] modifiedArgs; if (t == null) { @@ -370,6 +403,14 @@ public class SimpleProcessLogger implements ComponentLog { logRepository.addLogMessage(LogLevel.DEBUG, msg, os); } + @Override + public void debug(LogMessage logMessage) { + if (isDebugEnabled()) { + log(LogLevel.DEBUG, logMessage); + logRepository.addLogMessage(logMessage); + } + } + @Override public void log(LogLevel level, String msg, Throwable t) { switch (level) { @@ -458,6 +499,28 @@ public class SimpleProcessLogger implements ComponentLog { } } + @Override + public void log(LogMessage message) { + switch (message.getLogLevel()) { + case DEBUG: + debug(message); + break; + case ERROR: + case FATAL: + error(message); + break; + case INFO: + info(message); + break; + case TRACE: + trace(message); + break; + case WARN: + warn(message); + break; + } + } + private String getCauses(final Throwable throwable) { final LinkedList causes = new LinkedList<>(); for (Throwable t = throwable; t != null; t = t.getCause()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/logging/TestStandardLogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/logging/TestStandardLogRepository.java index 6017220cfb..7a4db9dceb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/logging/TestStandardLogRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/logging/TestStandardLogRepository.java @@ -17,14 +17,17 @@ package org.apache.nifi.logging; -import static org.junit.Assert.assertEquals; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.repository.StandardLogRepository; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.nifi.logging.repository.StandardLogRepository; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class TestStandardLogRepository { @@ -39,23 +42,45 @@ public class TestStandardLogRepository { repo.addLogMessage(LogLevel.DEBUG, "Testing {} to get exception message <{}>", new Object[]{observer.getClass().getName(), exception}); repo.addLogMessage(LogLevel.DEBUG, "Testing {} to get exception message", new Object[]{observer.getClass().getName()}, exception); - assertEquals(observer.getMessages().get(0), "Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message "); - assertEquals(observer.getMessages().get(1), "Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message"); + assertEquals("Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message ", observer.getMessages().get(0).getMessage()); + assertEquals("Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message", observer.getMessages().get(1).getMessage()); } - private class MockLogObserver implements LogObserver { + @Test + public void testLogRepositoryLogsFirstFlowFileUuid() { + StandardLogRepository repo = new StandardLogRepository(); + MockLogObserver observer = new MockLogObserver(); + repo.addObserver("mock", LogLevel.DEBUG, observer); + MockFlowFile mockFlowFile = new MockFlowFile(1L); - private List messages = new ArrayList(); + repo.addLogMessage(LogLevel.INFO, "Testing {} being shown in exception message", new Object[]{mockFlowFile}); + + assertEquals(mockFlowFile.getAttribute(CoreAttributes.UUID.key()), observer.getMessages().get(0).getFlowFileUuid()); + } + + @Test + public void testLogRepositoryDoesntLogMultipleFlowFileUuids() { + StandardLogRepository repo = new StandardLogRepository(); + MockLogObserver observer = new MockLogObserver(); + repo.addObserver("mock", LogLevel.DEBUG, observer); + MockFlowFile mockFlowFile1 = new MockFlowFile(1L); + MockFlowFile mockFlowFile2 = new MockFlowFile(2L); + + repo.addLogMessage(LogLevel.INFO, "Testing {} {} flowfiles are not being shown in exception message", new Object[]{mockFlowFile1, mockFlowFile2}); + + assertNull(observer.getMessages().get(0).getFlowFileUuid()); + } + + private static class MockLogObserver implements LogObserver { + private final List messages = new ArrayList<>(); @Override public void onLogMessage(LogMessage message) { - messages.add(message.getMessage()); + messages.add(message); } - public List getMessages() { + public List getMessages() { return messages; } - } - } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/TerminationAwareLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/TerminationAwareLogger.java index 0a4fbd78f2..76677d3f09 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/TerminationAwareLogger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/TerminationAwareLogger.java @@ -21,8 +21,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; public class TerminationAwareLogger implements ComponentLog { + + private static final String TERMINATED_TASK_PREFIX = "[Terminated Process] - "; private final ComponentLog logger; - private final String TERMINATED_TASK_PREFIX = "[Terminated Process] - "; private volatile boolean terminated = false; public TerminationAwareLogger(final ComponentLog logger) { @@ -41,6 +42,7 @@ public class TerminationAwareLogger implements ComponentLog { return TERMINATED_TASK_PREFIX + logLevel.name() + " - " + originalMessage; } + @Override public void warn(String msg, Throwable t) { if (isTerminated()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java index 1d9f8cf057..c8df2cbb97 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java @@ -27,30 +27,27 @@ public final class BulletinFactory { private static final AtomicLong currentId = new AtomicLong(0); + private BulletinFactory() { + } + public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) { - final ComponentType type; - switch (connectable.getConnectableType()) { - case REMOTE_INPUT_PORT: - case REMOTE_OUTPUT_PORT: - type = ComponentType.REMOTE_PROCESS_GROUP; - break; - case INPUT_PORT: - type = ComponentType.INPUT_PORT; - break; - case OUTPUT_PORT: - type = ComponentType.OUTPUT_PORT; - break; - case PROCESSOR: - default: - type = ComponentType.PROCESSOR; - break; - } + final ComponentType type = getComponentType(connectable); final ProcessGroup group = connectable.getProcessGroup(); final String groupId = connectable.getProcessGroupIdentifier(); final String groupName = group == null ? null : group.getName(); final String groupPath = buildGroupPath(group); - return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath); + return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath, null); + } + + public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message, final String flowFileUUID) { + final ComponentType type = getComponentType(connectable); + + final ProcessGroup group = connectable.getProcessGroup(); + final String groupId = connectable.getProcessGroupIdentifier(); + final String groupName = group == null ? null : group.getName(); + final String groupPath = buildGroupPath(group); + return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath, flowFileUUID); } private static String buildGroupPath(ProcessGroup group) { @@ -95,7 +92,7 @@ public final class BulletinFactory { } public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType, - final String sourceName, final String category, final String severity, final String message, final String groupPath) { + final String sourceName, final String category, final String severity, final String message, final String groupPath, final String flowFileUUID) { final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement()); bulletin.setGroupId(groupId); bulletin.setGroupName(groupName); @@ -106,6 +103,7 @@ public final class BulletinFactory { bulletin.setCategory(category); bulletin.setLevel(severity); bulletin.setMessage(message); + bulletin.setFlowFileUuid(flowFileUUID); return bulletin; } @@ -117,4 +115,25 @@ public final class BulletinFactory { bulletin.setSourceType(ComponentType.FLOW_CONTROLLER); return bulletin; } + + private static ComponentType getComponentType(final Connectable connectable) { + final ComponentType type; + switch (connectable.getConnectableType()) { + case REMOTE_INPUT_PORT: + case REMOTE_OUTPUT_PORT: + type = ComponentType.REMOTE_PROCESS_GROUP; + break; + case INPUT_PORT: + type = ComponentType.INPUT_PORT; + break; + case OUTPUT_PORT: + type = ComponentType.OUTPUT_PORT; + break; + case PROCESSOR: + default: + type = ComponentType.PROCESSOR; + break; + } + return type; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java index 23c4cdb0d9..706538941a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.events; - -import org.apache.nifi.reporting.Bulletin; - -/** - * - */ -public class ComponentBulletin extends Bulletin { - - ComponentBulletin(final long id) { - super(id); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import org.apache.nifi.reporting.Bulletin; + +/** + * + */ +public class ComponentBulletin extends Bulletin { + + ComponentBulletin(final long id) { + super(id); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java index 3359e7e5ac..9e2b3a3339 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java @@ -1,32 +1,32 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.events; - -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.ComponentType; - -/** - * - */ -public class SystemBulletin extends Bulletin { - - SystemBulletin(final long id) { - super(id); - setSourceType(ComponentType.FLOW_CONTROLLER); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.ComponentType; + +/** + * + */ +public class SystemBulletin extends Bulletin { + + SystemBulletin(final long id) { + super(id); + setSourceType(ComponentType.FLOW_CONTROLLER); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java index 06ddab4ea1..d348dfb989 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java @@ -18,9 +18,7 @@ package org.apache.nifi.logging; public interface LogRepository { - void addLogMessage(LogLevel level, String message); - - void addLogMessage(LogLevel level, String message, Throwable t); + void addLogMessage(LogMessage logMessage); void addLogMessage(LogLevel level, String messageFormat, Object[] params); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java index 920d7eb08f..31ce0ef428 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java @@ -17,14 +17,12 @@ package org.apache.nifi.mock; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.LogLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Stubs out the functionality of a ComponentLog so that it can * be used during initialization of a component. - * */ public class MockComponentLogger implements ComponentLog { @@ -116,9 +114,8 @@ public class MockComponentLogger implements ComponentLog { @Override public void info(String msg, Object[] os, Throwable t) { - logger.trace(msg, os); - logger.trace("", t); - + logger.info(msg, os); + logger.info("", t); } @Override @@ -167,92 +164,4 @@ public class MockComponentLogger implements ComponentLog { public void debug(String msg) { logger.debug(msg); } - - @Override - public void log(LogLevel level, String msg, Throwable t) { - switch (level) { - case DEBUG: - debug(msg, t); - break; - case ERROR: - case FATAL: - error(msg, t); - break; - case INFO: - info(msg, t); - break; - case TRACE: - trace(msg, t); - break; - case WARN: - warn(msg, t); - break; - } - } - - @Override - public void log(LogLevel level, String msg, Object[] os) { - switch (level) { - case DEBUG: - debug(msg, os); - break; - case ERROR: - case FATAL: - error(msg, os); - break; - case INFO: - info(msg, os); - break; - case TRACE: - trace(msg, os); - break; - case WARN: - warn(msg, os); - break; - } - } - - @Override - public void log(LogLevel level, String msg) { - switch (level) { - case DEBUG: - debug(msg); - break; - case ERROR: - case FATAL: - error(msg); - break; - case INFO: - info(msg); - break; - case TRACE: - trace(msg); - break; - case WARN: - warn(msg); - break; - } - } - - @Override - public void log(LogLevel level, String msg, Object[] os, Throwable t) { - switch (level) { - case DEBUG: - debug(msg, os, t); - break; - case ERROR: - case FATAL: - error(msg, os, t); - break; - case INFO: - info(msg, os, t); - break; - case TRACE: - trace(msg, os, t); - break; - case WARN: - warn(msg, os, t); - break; - } - } } diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/MockComponentLog.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/MockComponentLog.java index 9d5781f2d9..e31c35d6c2 100644 --- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/MockComponentLog.java +++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/MockComponentLog.java @@ -18,6 +18,7 @@ package org.apache.nifi.rules.handlers; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogMessage; public class MockComponentLog implements ComponentLog { @@ -55,6 +56,11 @@ public class MockComponentLog implements ComponentLog { warnMessage = msg; } + @Override + public void warn(LogMessage logMessage) { + warnMessage = logMessage.getMessage(); + } + @Override public void trace(String msg, Throwable t) { trace(msg); @@ -75,6 +81,11 @@ public class MockComponentLog implements ComponentLog { trace(convertMessage(msg,os)); } + @Override + public void trace(LogMessage logMessage) { + traceMessage = logMessage.getMessage(); + } + @Override public boolean isWarnEnabled() { return true; @@ -120,6 +131,11 @@ public class MockComponentLog implements ComponentLog { info(convertMessage(msg,os)); } + @Override + public void info(LogMessage message) { + infoMessage = message.getMessage(); + } + @Override public String getName() { return null; @@ -145,6 +161,11 @@ public class MockComponentLog implements ComponentLog { error(msg); } + @Override + public void error(LogMessage message) { + errorMessage = message.getMessage(); + } + @Override public void debug(String msg, Throwable t) { debug(msg); @@ -165,6 +186,11 @@ public class MockComponentLog implements ComponentLog { debugMessage = msg; } + @Override + public void debug(LogMessage message) { + debugMessage = message.getMessage(); + } + @Override public void log(LogLevel level, String msg, Throwable t) { @@ -175,33 +201,6 @@ public class MockComponentLog implements ComponentLog { } - @Override - public void log(LogLevel level, String msg) { - switch (level) { - case WARN: - warn(msg); - break; - case DEBUG: - debug(msg); - break; - case INFO: - info(msg); - break; - case ERROR: - error(msg); - break; - case TRACE: - trace(msg); - break; - case FATAL: - error(msg); - break; - case NONE: - info(msg); - break; - } - } - @Override public void log(LogLevel level, String msg, Object[] os, Throwable t) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 8fe220c016..24e0f6096c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -17,27 +17,6 @@ package org.apache.nifi.reporting; -import java.io.IOException; -import java.io.InputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; - import org.apache.avro.Schema; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; @@ -53,6 +32,26 @@ import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.reporting.s2s.SiteToSiteUtils; import org.apache.nifi.scheduling.SchedulingStrategy; +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + @Tags({"bulletin", "site", "site to site"}) @CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to " + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins " @@ -89,24 +88,24 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting final String nodeId = context.getClusterNodeIdentifier(); if (nodeId == null && isClustered) { getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. " - + "Will wait for Node Identifier to be established."); + + "Will wait for Node Identifier to be established."); return; } final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build(); final List bulletins = context.getBulletinRepository().findBulletins(bulletinQuery); - if(bulletins == null || bulletins.isEmpty()) { + if (bulletins == null || bulletins.isEmpty()) { getLogger().debug("No events to send because no events are stored in the repository."); return; } - final OptionalLong opMaxId = bulletins.stream().mapToLong(t -> t.getId()).max(); - final Long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1; + final OptionalLong opMaxId = bulletins.stream().mapToLong(Bulletin::getId).max(); + final long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1; - if(currMaxId < lastSentBulletinId){ + if (currMaxId < lastSentBulletinId) { getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. " - + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId}); + + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", currMaxId, lastSentBulletinId); lastSentBulletinId = -1; } @@ -130,8 +129,8 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting // Create a JSON array of all the events in the current batch final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); for (final Bulletin bulletin : bulletins) { - if(bulletin.getId() > lastSentBulletinId) { - arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId, allowNullValues)); + if (bulletin.getId() > lastSentBulletinId) { + arrayBuilder.add(serialize(builder, bulletin, df, platform, nodeId, allowNullValues)); } } final JsonArray jsonArray = arrayBuilder.build(); @@ -162,7 +161,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}", - new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()}); + bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()); } catch (final Exception e) { if (transaction != null) { transaction.error(); @@ -177,8 +176,8 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting lastSentBulletinId = currMaxId; } - private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, - final String platform, final String nodeIdentifier, Boolean allowNullValues) { + private JsonObject serialize(final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, + final String platform, final String nodeIdentifier, Boolean allowNullValues) { addField(builder, "objectId", UUID.randomUUID().toString(), allowNullValues); addField(builder, "platform", platform, allowNullValues); @@ -195,6 +194,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting addField(builder, "bulletinSourceName", bulletin.getSourceName(), allowNullValues); addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(), allowNullValues); addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()), allowNullValues); + addField(builder, "bulletinFlowFileUuid", bulletin.getFlowFileUuid(), allowNullValues); return builder.build(); } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html index 13ced0b0e2..3b693a9744 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html @@ -56,7 +56,8 @@ { "name" : "bulletinSourceId", "type" : ["string", "null"] }, { "name" : "bulletinSourceName", "type" : ["string", "null"] }, { "name" : "bulletinSourceType", "type" : ["string", "null"] }, - { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" } + { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }, + { "name" : "bulletinFlowFileUuid", "type" : ["string", "null"] } ] } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc index f681169103..0290e487db 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc @@ -17,6 +17,7 @@ { "name" : "bulletinSourceId", "type" : ["string", "null"] }, { "name" : "bulletinSourceName", "type" : ["string", "null"] }, { "name" : "bulletinSourceType", "type" : ["string", "null"] }, - { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" } + { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }, + { "name" : "bulletinFlowFileUuid", "type" : ["string", "null"] } ] } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java index 6acf3e8693..3daaa62ac1 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -32,7 +32,6 @@ import org.apache.nifi.util.MockPropertyValue; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import javax.json.Json; @@ -56,14 +55,11 @@ import static org.mockito.Mockito.when; public class TestSiteToSiteBulletinReportingTask { @Test - public void testUrls() throws IOException { + public void testUrls() { final ValidationContext context = Mockito.mock(ValidationContext.class); - Mockito.when(context.newPropertyValue(Mockito.anyString())).then(new Answer() { - @Override - public PropertyValue answer(InvocationOnMock invocation) throws Throwable { - String value = (String) invocation.getArguments()[0]; - return new StandardPropertyValue(value, null, null); - } + Mockito.when(context.newPropertyValue(Mockito.anyString())).then((Answer) invocation -> { + String value = (String) invocation.getArguments()[0]; + return new StandardPropertyValue(value, null, null); }); assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi", context).isValid()); @@ -80,8 +76,8 @@ public class TestSiteToSiteBulletinReportingTask { @Test public void testSerializedForm() throws IOException, InitializationException { // creating the list of bulletins - final List bulletins = new ArrayList(); - bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", ComponentType.PROCESSOR, "source-name", "category", "severity", "message", "group-path")); + final List bulletins = new ArrayList<>(); + bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", ComponentType.PROCESSOR, "source-name", "category", "severity", "message", "group-path", "flowFileUuid")); // mock the access to the list of bulletins final ReportingContext context = Mockito.mock(ReportingContext.class); @@ -100,12 +96,9 @@ public class TestSiteToSiteBulletinReportingTask { properties.put(SiteToSiteUtils.BATCH_SIZE, "1000"); properties.put(SiteToSiteUtils.PLATFORM, "nifi"); - Mockito.doAnswer(new Answer() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor)); - } + Mockito.doAnswer((Answer) invocation -> { + final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); // setup the mock initialization context @@ -125,12 +118,13 @@ public class TestSiteToSiteBulletinReportingTask { assertEquals("message", bulletinJson.getString("bulletinMessage")); assertEquals("group-name", bulletinJson.getString("bulletinGroupName")); assertEquals("group-path", bulletinJson.getString("bulletinGroupPath")); + assertEquals("flowFileUuid", bulletinJson.getString("bulletinFlowFileUuid")); } @Test public void testSerializedFormWithNullValues() throws IOException, InitializationException { // creating the list of bulletins - final List bulletins = new ArrayList(); + final List bulletins = new ArrayList<>(); bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", "source-name", "category", "severity", "message")); // mock the access to the list of bulletins @@ -151,12 +145,9 @@ public class TestSiteToSiteBulletinReportingTask { properties.put(SiteToSiteUtils.PLATFORM, "nifi"); properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES, "true"); - Mockito.doAnswer(new Answer() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor)); - } + Mockito.doAnswer((Answer) invocation -> { + final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); // setup the mock initialization context diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinEnumerator.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinEnumerator.java index 8074bcadbf..3abc022162 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinEnumerator.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinEnumerator.java @@ -105,7 +105,8 @@ public class BulletinEnumerator implements Enumerator { bulletin.getSourceId(), bulletin.getSourceName(), bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(), - bulletin.getTimestamp() == null ? null : bulletin.getTimestamp().getTime() + bulletin.getTimestamp() == null ? null : bulletin.getTimestamp().getTime(), + bulletin.getFlowFileUuid() }; // If we want no fields just return null diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinTable.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinTable.java index 0d6a36b023..42a086461b 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinTable.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/bulletins/BulletinTable.java @@ -164,7 +164,8 @@ public class BulletinTable extends AbstractTable implements QueryableTable, Tran "bulletinSourceId", "bulletinSourceName", "bulletinSourceType", - "bulletinTimestamp" + "bulletinTimestamp", + "bulletinFlowFileUuid" ); final List types = Arrays.asList( typeFactory.createJavaType(long.class), @@ -179,7 +180,8 @@ public class BulletinTable extends AbstractTable implements QueryableTable, Tran typeFactory.createJavaType(String.class), typeFactory.createJavaType(String.class), typeFactory.createJavaType(String.class), - typeFactory.createJavaType(long.class) + typeFactory.createJavaType(long.class), + typeFactory.createJavaType(String.class) ); relDataType = typeFactory.createStructType(Pair.zip(names, types)); diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html index a56324d673..33baa0a60d 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html @@ -111,6 +111,7 @@ bulletinSourceNameString bulletinSourceTypeString bulletinTimestampDate + bulletinFlowFileUuidString

PROCESS_GROUP_STATUS

diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index 84c061f7f9..0bf7aad366 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -307,22 +307,26 @@ public class TestQueryNiFiReportingTask { reportingTask = initTask(properties); reportingTask.onTrigger(context); - List> rows = mockRecordSinkService.getRows(); + final List> rows = mockRecordSinkService.getRows(); + final String flowFileUuid = "testFlowFileUuid"; assertEquals(3, rows.size()); // Validate the first row Map row = rows.get(0); - assertEquals(13, row.size()); + assertEquals(14, row.size()); assertNotNull(row.get("bulletinId")); assertEquals("controller", row.get("bulletinCategory")); assertEquals("WARN", row.get("bulletinLevel")); + assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); // Validate the second row row = rows.get(1); assertEquals("processor", row.get("bulletinCategory")); assertEquals("INFO", row.get("bulletinLevel")); + assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); // Validate the third row row = rows.get(2); assertEquals("controller service", row.get("bulletinCategory")); assertEquals("ERROR", row.get("bulletinLevel")); + assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); } private MockQueryNiFiReportingTask initTask(Map customProperties) throws InitializationException, IOException { @@ -409,9 +413,9 @@ public class TestQueryNiFiReportingTask { Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); MockBulletinRepository bulletinRepository = new MockQueryBulletinRepository(); - bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2")); - bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", "INFO", "test bulletin 1")); - bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller service", "ERROR", "test bulletin 2")); + bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2", "testFlowFileUuid")); + bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", "INFO", "test bulletin 1", "testFlowFileUuid")); + bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller service", "ERROR", "test bulletin 2", "testFlowFileUuid")); Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository); return reportingTask;