NIFI-8385: Add FlowFiles from logging to bulletins (#4952)

This commit is contained in:
Lehel Boér 2021-10-27 22:06:30 +02:00 committed by GitHub
parent 104078868e
commit 059f14fd62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 711 additions and 525 deletions

View File

@ -18,6 +18,7 @@ package org.apache.nifi.documentation.init;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogMessage;
public class NopComponentLog implements ComponentLog {
@Override
@ -40,6 +41,11 @@ public class NopComponentLog implements ComponentLog {
}
@Override
public void warn(LogMessage logMessage) {
}
@Override
public void trace(final String msg, final Throwable t) {
@ -60,6 +66,11 @@ public class NopComponentLog implements ComponentLog {
}
@Override
public void trace(LogMessage logMessage) {
}
@Override
public boolean isWarnEnabled() {
return false;
@ -105,6 +116,11 @@ public class NopComponentLog implements ComponentLog {
}
@Override
public void info(LogMessage message) {
}
@Override
public String getName() {
return null;
@ -130,6 +146,11 @@ public class NopComponentLog implements ComponentLog {
}
@Override
public void error(LogMessage message) {
}
@Override
public void debug(final String msg, final Throwable t) {
@ -150,6 +171,11 @@ public class NopComponentLog implements ComponentLog {
}
@Override
public void debug(LogMessage message) {
}
@Override
public void log(final LogLevel level, final String msg, final Throwable t) {
@ -169,4 +195,10 @@ public class NopComponentLog implements ComponentLog {
public void log(final LogLevel level, final String msg, final Object[] os, final Throwable t) {
}
@Override
public void log(LogMessage message) {
}
}

View File

@ -34,7 +34,7 @@ public interface FlowFile extends Comparable<FlowFile> {
* should not be used for true universal unique type needs. For that consider
* using the attribute found in the flow file's attribute map keyed by
* {@link org.apache.nifi.flowfile.attributes.CoreAttributes.UUID CoreAttributes.UUID}.
* For example, by calling getAttribute(CoreAttributes.UUID.getKey()).
* For example, by calling getAttribute(CoreAttributes.UUID.key()).
*/
long getId();

View File

@ -46,7 +46,6 @@ package org.apache.nifi.logging;
* Managers to understand that a problem exists and what the issue is.
* </li>
* </ul>
*
*/
public interface ComponentLog {
@ -58,6 +57,10 @@ public interface ComponentLog {
void warn(String msg);
default void warn(LogMessage logMessage) {
log(LogLevel.WARN, logMessage);
}
void trace(String msg, Throwable t);
void trace(String msg, Object... os);
@ -66,6 +69,10 @@ public interface ComponentLog {
void trace(String msg, Object[] os, Throwable t);
default void trace(LogMessage logMessage) {
log(LogLevel.TRACE, logMessage);
}
boolean isWarnEnabled();
boolean isTraceEnabled();
@ -84,6 +91,10 @@ public interface ComponentLog {
void info(String msg, Object[] os, Throwable t);
default void info(LogMessage logMessage) {
log(LogLevel.INFO, logMessage);
}
String getName();
void error(String msg, Throwable t);
@ -94,6 +105,10 @@ public interface ComponentLog {
void error(String msg, Object[] os, Throwable t);
default void error(LogMessage logMessage) {
log(LogLevel.ERROR, logMessage);
}
void debug(String msg, Throwable t);
void debug(String msg, Object... os);
@ -102,11 +117,128 @@ public interface ComponentLog {
void debug(String msg);
void log(LogLevel level, String msg, Throwable t);
default void debug(LogMessage logMessage) {
log(LogLevel.ERROR, logMessage);
}
void log(LogLevel level, String msg, Object... os);
default void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
void log(LogLevel level, String msg);
default void log(LogLevel level, String msg, Object... os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
void log(LogLevel level, String msg, Object[] os, Throwable t);
default void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
default void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
default void log(LogMessage message) {
switch (message.getLogLevel()) {
case DEBUG:
debug(message);
break;
case ERROR:
case FATAL:
error(message);
break;
case INFO:
info(message);
break;
case TRACE:
trace(message);
break;
case WARN:
warn(message);
break;
}
}
default void log(LogLevel level, LogMessage logMessage) {
String msg = logMessage.getMessage();
Throwable t = logMessage.getThrowable();
Object[] os = logMessage.getObjects();
if (os != null && t != null) {
log(level, msg, os, t);
} else if (os != null) {
log(level, msg, os);
} else if (t != null) {
log(level, msg, t);
} else {
log(level, msg);
}
}
}

View File

@ -25,22 +25,65 @@ import java.util.Locale;
public class LogMessage {
private final String message;
private final LogLevel level;
private final Throwable throwable;
private final long time;
private final String message;
private final LogLevel logLevel;
private final Throwable throwable;
private final String flowFileUuid;
private final Object[] objects;
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String TO_STRING_FORMAT = "%1$s %2$s - %3$s";
public LogMessage(final long millisSinceEpoch, final LogLevel level, final String message, final Throwable throwable) {
this.level = level;
this.throwable = throwable;
this.message = message;
this.time = millisSinceEpoch;
public static class Builder {
private final long time;
private final LogLevel logLevel;
private String message;
private Throwable throwable;
private String flowFileUuid;
private Object[] objects;
public Builder(final long time, final LogLevel logLevel) {
this.time = time;
this.logLevel = logLevel;
}
public Builder message(String message) {
this.message = message;
return this;
}
public Builder throwable(Throwable throwable) {
this.throwable = throwable;
return this;
}
public Builder flowFileUuid(String flowFileUuid) {
this.flowFileUuid = flowFileUuid;
return this;
}
public Builder objects(Object[] objects) {
this.objects = objects;
return this;
}
public LogMessage createLogMessage() {
return new LogMessage(time, logLevel, message, throwable, flowFileUuid, objects);
}
}
public long getMillisSinceEpoch() {
private LogMessage(final long time, final LogLevel logLevel, final String message, final Throwable throwable, final String flowFileUuid, final Object[] objects) {
this.logLevel = logLevel;
this.throwable = throwable;
this.message = message;
this.time = time;
this.flowFileUuid = flowFileUuid;
this.objects = objects;
}
public long getTime() {
return time;
}
@ -48,25 +91,33 @@ public class LogMessage {
return message;
}
public LogLevel getLevel() {
return level;
public LogLevel getLogLevel() {
return logLevel;
}
public Throwable getThrowable() {
return throwable;
}
public String getFlowFileUuid() {
return flowFileUuid;
}
public Object[] getObjects() {
return objects;
}
@Override
public String toString() {
final DateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT, Locale.US);
final String formattedTime = dateFormat.format(new Date(time));
String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, level.toString(), message);
String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, logLevel.toString(), message);
if (throwable != null) {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
formattedMsg += "\n" + sw.toString();
formattedMsg += System.lineSeparator() + sw;
}
return formattedMsg;

View File

@ -37,6 +37,7 @@ public abstract class Bulletin implements Comparable<Bulletin> {
private String sourceId;
private String sourceName;
private ComponentType sourceType;
private String flowFileUuid;
protected Bulletin(final long id) {
this.timestamp = new Date();
@ -131,6 +132,14 @@ public abstract class Bulletin implements Comparable<Bulletin> {
this.sourceType = sourceType;
}
public String getFlowFileUuid() {
return flowFileUuid;
}
public void setFlowFileUuid(String flowFileUuid) {
this.flowFileUuid = flowFileUuid;
}
@Override
public String toString() {
return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}';

View File

@ -1,71 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import java.util.concurrent.atomic.AtomicLong;
public class BulletinFactory {
private static final AtomicLong currentId = new AtomicLong(0);
public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setSourceId(sourceId);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final String sourceName,
final String category, final String severity, final String message) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setGroupName(groupName);
bulletin.setSourceId(sourceId);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String category, final String severity, final String message) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message, final String groupPath) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setGroupName(groupName);
bulletin.setGroupPath(groupPath);
bulletin.setSourceId(sourceId);
bulletin.setSourceType(sourceType);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import java.util.concurrent.atomic.AtomicLong;
public class BulletinFactory {
private static final AtomicLong currentId = new AtomicLong(0);
public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setSourceId(sourceId);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final String sourceName,
final String category, final String severity, final String message) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setGroupName(groupName);
bulletin.setSourceId(sourceId);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String category, final String severity, final String message) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String category, final String severity, final String message, final String flowFileUUID) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
bulletin.setFlowFileUuid(flowFileUUID);
return bulletin;
}
public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message, final String groupPath) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setGroupName(groupName);
bulletin.setGroupPath(groupPath);
bulletin.setSourceId(sourceId);
bulletin.setSourceType(sourceType);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
return bulletin;
}
public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message,
final String groupPath, final String flowFileUUID) {
final Bulletin bulletin = new MockBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setGroupName(groupName);
bulletin.setGroupPath(groupPath);
bulletin.setSourceId(sourceId);
bulletin.setSourceType(sourceType);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
bulletin.setFlowFileUuid(flowFileUUID);
return bulletin;
}
}

View File

@ -1,24 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
public class MockBulletin extends Bulletin {
protected MockBulletin(long id) {
super(id);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
public class MockBulletin extends Bulletin {
protected MockBulletin(long id) {
super(id);
}
}

View File

@ -16,12 +16,12 @@
*/
package org.apache.nifi.util;
import java.util.List;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class MockComponentLog implements ComponentLog {
private final CapturingLogger logger;
@ -74,9 +74,7 @@ public class MockComponentLog implements ComponentLog {
private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) {
final Object[] modifiedArgs = new Object[os.length + 2];
modifiedArgs[0] = component.toString();
for (int i = 0; i < os.length; i++) {
modifiedArgs[i + 1] = os[i];
}
System.arraycopy(os, 0, modifiedArgs, 1, os.length);
modifiedArgs[modifiedArgs.length - 1] = t.toString();
return modifiedArgs;
@ -89,9 +87,7 @@ public class MockComponentLog implements ComponentLog {
final Object[] modifiedArgs = new Object[os.length + 3];
modifiedArgs[0] = component.toString();
for (int i = 0; i < os.length; i++) {
modifiedArgs[i + 1] = os[i];
}
System.arraycopy(os, 0, modifiedArgs, 1, os.length);
modifiedArgs[modifiedArgs.length - 2] = t.toString();
modifiedArgs[modifiedArgs.length - 1] = t;
@ -322,93 +318,4 @@ public class MockComponentLog implements ComponentLog {
logger.debug(msg, os);
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
}

View File

@ -100,11 +100,7 @@ public class MockReportingContext extends MockControllerServiceLookup implements
@Override
public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
final Bulletin bulletin = BulletinFactory.createBulletin(null, null, componentId, "test processor", category, severity.name(), message);
List<Bulletin> bulletins = componentBulletinsCreated.get(componentId);
if (bulletins == null) {
bulletins = new ArrayList<>();
componentBulletinsCreated.put(componentId, bulletins);
}
List<Bulletin> bulletins = componentBulletinsCreated.computeIfAbsent(componentId, k -> new ArrayList<>());
bulletins.add(bulletin);
return bulletin;
}

View File

@ -38,7 +38,7 @@ public class ControllerServiceLogObserver implements LogObserver {
public void onLogMessage(final LogMessage message) {
// Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
// the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString();
final ProcessGroup pg = serviceNode.getProcessGroup();
final String groupId = pg == null ? null : pg.getIdentifier();

View File

@ -40,9 +40,8 @@ public class ProcessorLogObserver implements LogObserver {
public void onLogMessage(final LogMessage message) {
// Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
// the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
final String bulletinLevel = (message.getLevel() == LogLevel.WARN) ? Severity.WARNING.name() : message.getLevel().toString();
bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage()));
final String bulletinLevel = (message.getLogLevel() == LogLevel.WARN) ? Severity.WARNING.name() : message.getLogLevel().toString();
bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid()));
}
}

View File

@ -36,7 +36,7 @@ public class ReportingTaskLogObserver implements LogObserver {
public void onLogMessage(final LogMessage message) {
// Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
// the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString();
final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());

View File

@ -19,6 +19,7 @@ package org.apache.nifi.logging.repository;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogMessage;
import org.apache.nifi.logging.LogObserver;
import org.apache.nifi.logging.LogRepository;
@ -30,11 +31,7 @@ public class NopLogRepository implements LogRepository {
private volatile ComponentLog logger;
@Override
public void addLogMessage(final LogLevel level, final String message) {
}
@Override
public void addLogMessage(final LogLevel level, final String message, final Throwable t) {
public void addLogMessage(LogMessage logMessage) {
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.logging.repository;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogMessage;
@ -27,14 +29,16 @@ import org.slf4j.helpers.MessageFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StandardLogRepository implements LogRepository {
private final Map<LogLevel, Collection<LogObserver>> observers = new HashMap<>();
private final Map<LogLevel, Collection<LogObserver>> observers = new EnumMap<>(LogLevel.class);
private final Map<String, LogObserver> observerLookup = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -46,43 +50,64 @@ public class StandardLogRepository implements LogRepository {
private volatile ComponentLog componentLogger;
@Override
public void addLogMessage(final LogLevel level, final String message) {
addLogMessage(level, message, (Throwable) null);
}
public void addLogMessage(LogMessage logMessage) {
LogLevel logLevel = logMessage.getLogLevel();
@Override
public void addLogMessage(final LogLevel level, final String message, final Throwable t) {
final LogMessage logMessage = new LogMessage(System.currentTimeMillis(), level, message, t);
final Collection<LogObserver> logObservers = observers.get(level);
final Collection<LogObserver> logObservers = observers.get(logLevel);
if (logObservers != null) {
for (LogObserver observer : logObservers) {
try {
observer.onLogMessage(logMessage);
} catch (final Throwable observerThrowable) {
} catch (final Exception observerThrowable) {
logger.error("Failed to pass log message to Observer {} due to {}", observer, observerThrowable.toString());
}
}
}
}
@Override
public void addLogMessage(final LogLevel level, final String format, final Object[] params) {
replaceThrowablesWithMessage(params);
final Optional<String> flowFileUuid = getFirstFlowFileUuidFromObjects(params);
final String formattedMessage = MessageFormatter.arrayFormat(format, params).getMessage();
addLogMessage(level, formattedMessage);
final LogMessage logMessage = new LogMessage.Builder(System.currentTimeMillis(), level)
.message(formattedMessage)
.flowFileUuid(flowFileUuid.orElse(null))
.createLogMessage();
addLogMessage(logMessage);
}
@Override
public void addLogMessage(final LogLevel level, final String format, final Object[] params, final Throwable t) {
replaceThrowablesWithMessage(params);
final Optional<String> flowFileUuid = getFirstFlowFileUuidFromObjects(params);
final String formattedMessage = MessageFormatter.arrayFormat(format, params, t).getMessage();
addLogMessage(level, formattedMessage, t);
final LogMessage logMessage = new LogMessage.Builder(System.currentTimeMillis(), level)
.message(formattedMessage)
.throwable(t)
.flowFileUuid(flowFileUuid.orElse(null))
.createLogMessage();
addLogMessage(logMessage);
}
private Optional<String> getFirstFlowFileUuidFromObjects(Object[] params) {
int flowFileCount = 0;
FlowFile flowFileFound = null;
for (final Object param : params) {
if (param instanceof FlowFile) {
if (++flowFileCount > 1) {
return Optional.empty();
}
flowFileFound = (FlowFile) param;
}
}
return Optional.ofNullable(flowFileFound).map(ff -> ff.getAttribute(CoreAttributes.UUID.key()));
}
private void replaceThrowablesWithMessage(final Object[] params) {
for (int i = 0; i < params.length; i++) {
if(params[i] instanceof Throwable) {
if (params[i] instanceof Throwable) {
params[i] = ((Throwable) params[i]).getLocalizedMessage();
}
}
@ -139,11 +164,7 @@ public class StandardLogRepository implements LogRepository {
for (int i = minimumLevel.ordinal(); i < allLevels.length; i++) {
// no need to register an observer for NONE since that level will never be logged to by a component
if (i != LogLevel.NONE.ordinal()) {
Collection<LogObserver> collection = observers.get(allLevels[i]);
if (collection == null) {
collection = new ArrayList<>();
observers.put(allLevels[i], collection);
}
Collection<LogObserver> collection = observers.computeIfAbsent(allLevels[i], k -> new ArrayList<>());
collection.add(observer);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processor;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogMessage;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.slf4j.Logger;
@ -113,6 +114,14 @@ public class SimpleProcessLogger implements ComponentLog {
logRepository.addLogMessage(LogLevel.WARN, msg, os);
}
@Override
public void warn(LogMessage logMessage) {
if (isWarnEnabled()) {
log(LogLevel.WARN, logMessage);
logRepository.addLogMessage(logMessage);
}
}
@Override
public void trace(String msg, Throwable t) {
if (!isTraceEnabled()) {
@ -162,6 +171,14 @@ public class SimpleProcessLogger implements ComponentLog {
logRepository.addLogMessage(LogLevel.TRACE, msg, os, t);
}
@Override
public void trace(LogMessage logMessage) {
if (isTraceEnabled()) {
log(LogLevel.TRACE, logMessage);
logRepository.addLogMessage(logMessage);
}
}
@Override
public boolean isTraceEnabled() {
return logger.isTraceEnabled();
@ -242,6 +259,14 @@ public class SimpleProcessLogger implements ComponentLog {
logRepository.addLogMessage(LogLevel.INFO, msg, os, t);
}
@Override
public void info(LogMessage logMessage) {
if (isInfoEnabled()) {
log(LogLevel.INFO, logMessage);
logRepository.addLogMessage(logMessage);
}
}
@Override
public String getName() {
return logger.getName();
@ -300,6 +325,14 @@ public class SimpleProcessLogger implements ComponentLog {
logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
}
@Override
public void error(LogMessage logMessage) {
if (isErrorEnabled()) {
log(LogLevel.ERROR, logMessage);
logRepository.addLogMessage(logMessage);
}
}
private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) {
final Object[] modifiedArgs;
if (t == null) {
@ -370,6 +403,14 @@ public class SimpleProcessLogger implements ComponentLog {
logRepository.addLogMessage(LogLevel.DEBUG, msg, os);
}
@Override
public void debug(LogMessage logMessage) {
if (isDebugEnabled()) {
log(LogLevel.DEBUG, logMessage);
logRepository.addLogMessage(logMessage);
}
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
@ -458,6 +499,28 @@ public class SimpleProcessLogger implements ComponentLog {
}
}
@Override
public void log(LogMessage message) {
switch (message.getLogLevel()) {
case DEBUG:
debug(message);
break;
case ERROR:
case FATAL:
error(message);
break;
case INFO:
info(message);
break;
case TRACE:
trace(message);
break;
case WARN:
warn(message);
break;
}
}
private String getCauses(final Throwable throwable) {
final LinkedList<String> causes = new LinkedList<>();
for (Throwable t = throwable; t != null; t = t.getCause()) {

View File

@ -17,14 +17,17 @@
package org.apache.nifi.logging;
import static org.junit.Assert.assertEquals;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.repository.StandardLogRepository;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.logging.repository.StandardLogRepository;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TestStandardLogRepository {
@ -39,23 +42,45 @@ public class TestStandardLogRepository {
repo.addLogMessage(LogLevel.DEBUG, "Testing {} to get exception message <{}>", new Object[]{observer.getClass().getName(), exception});
repo.addLogMessage(LogLevel.DEBUG, "Testing {} to get exception message", new Object[]{observer.getClass().getName()}, exception);
assertEquals(observer.getMessages().get(0), "Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message <exception>");
assertEquals(observer.getMessages().get(1), "Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message");
assertEquals("Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message <exception>", observer.getMessages().get(0).getMessage());
assertEquals("Testing org.apache.nifi.logging.TestStandardLogRepository$MockLogObserver to get exception message", observer.getMessages().get(1).getMessage());
}
private class MockLogObserver implements LogObserver {
@Test
public void testLogRepositoryLogsFirstFlowFileUuid() {
StandardLogRepository repo = new StandardLogRepository();
MockLogObserver observer = new MockLogObserver();
repo.addObserver("mock", LogLevel.DEBUG, observer);
MockFlowFile mockFlowFile = new MockFlowFile(1L);
private List<String> messages = new ArrayList<String>();
repo.addLogMessage(LogLevel.INFO, "Testing {} being shown in exception message", new Object[]{mockFlowFile});
assertEquals(mockFlowFile.getAttribute(CoreAttributes.UUID.key()), observer.getMessages().get(0).getFlowFileUuid());
}
@Test
public void testLogRepositoryDoesntLogMultipleFlowFileUuids() {
StandardLogRepository repo = new StandardLogRepository();
MockLogObserver observer = new MockLogObserver();
repo.addObserver("mock", LogLevel.DEBUG, observer);
MockFlowFile mockFlowFile1 = new MockFlowFile(1L);
MockFlowFile mockFlowFile2 = new MockFlowFile(2L);
repo.addLogMessage(LogLevel.INFO, "Testing {} {} flowfiles are not being shown in exception message", new Object[]{mockFlowFile1, mockFlowFile2});
assertNull(observer.getMessages().get(0).getFlowFileUuid());
}
private static class MockLogObserver implements LogObserver {
private final List<LogMessage> messages = new ArrayList<>();
@Override
public void onLogMessage(LogMessage message) {
messages.add(message.getMessage());
messages.add(message);
}
public List<String> getMessages() {
public List<LogMessage> getMessages() {
return messages;
}
}
}

View File

@ -21,8 +21,9 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
public class TerminationAwareLogger implements ComponentLog {
private static final String TERMINATED_TASK_PREFIX = "[Terminated Process] - ";
private final ComponentLog logger;
private final String TERMINATED_TASK_PREFIX = "[Terminated Process] - ";
private volatile boolean terminated = false;
public TerminationAwareLogger(final ComponentLog logger) {
@ -41,6 +42,7 @@ public class TerminationAwareLogger implements ComponentLog {
return TERMINATED_TASK_PREFIX + logLevel.name() + " - " + originalMessage;
}
@Override
public void warn(String msg, Throwable t) {
if (isTerminated()) {

View File

@ -27,30 +27,27 @@ public final class BulletinFactory {
private static final AtomicLong currentId = new AtomicLong(0);
private BulletinFactory() {
}
public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
final ComponentType type;
switch (connectable.getConnectableType()) {
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
type = ComponentType.REMOTE_PROCESS_GROUP;
break;
case INPUT_PORT:
type = ComponentType.INPUT_PORT;
break;
case OUTPUT_PORT:
type = ComponentType.OUTPUT_PORT;
break;
case PROCESSOR:
default:
type = ComponentType.PROCESSOR;
break;
}
final ComponentType type = getComponentType(connectable);
final ProcessGroup group = connectable.getProcessGroup();
final String groupId = connectable.getProcessGroupIdentifier();
final String groupName = group == null ? null : group.getName();
final String groupPath = buildGroupPath(group);
return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath);
return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath, null);
}
public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message, final String flowFileUUID) {
final ComponentType type = getComponentType(connectable);
final ProcessGroup group = connectable.getProcessGroup();
final String groupId = connectable.getProcessGroupIdentifier();
final String groupName = group == null ? null : group.getName();
final String groupPath = buildGroupPath(group);
return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath, flowFileUUID);
}
private static String buildGroupPath(ProcessGroup group) {
@ -95,7 +92,7 @@ public final class BulletinFactory {
}
public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message, final String groupPath) {
final String sourceName, final String category, final String severity, final String message, final String groupPath, final String flowFileUUID) {
final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setGroupName(groupName);
@ -106,6 +103,7 @@ public final class BulletinFactory {
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
bulletin.setFlowFileUuid(flowFileUUID);
return bulletin;
}
@ -117,4 +115,25 @@ public final class BulletinFactory {
bulletin.setSourceType(ComponentType.FLOW_CONTROLLER);
return bulletin;
}
private static ComponentType getComponentType(final Connectable connectable) {
final ComponentType type;
switch (connectable.getConnectableType()) {
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
type = ComponentType.REMOTE_PROCESS_GROUP;
break;
case INPUT_PORT:
type = ComponentType.INPUT_PORT;
break;
case OUTPUT_PORT:
type = ComponentType.OUTPUT_PORT;
break;
case PROCESSOR:
default:
type = ComponentType.PROCESSOR;
break;
}
return type;
}
}

View File

@ -1,30 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.events;
import org.apache.nifi.reporting.Bulletin;
/**
*
*/
public class ComponentBulletin extends Bulletin {
ComponentBulletin(final long id) {
super(id);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.events;
import org.apache.nifi.reporting.Bulletin;
/**
*
*/
public class ComponentBulletin extends Bulletin {
ComponentBulletin(final long id) {
super(id);
}
}

View File

@ -1,32 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.events;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;
/**
*
*/
public class SystemBulletin extends Bulletin {
SystemBulletin(final long id) {
super(id);
setSourceType(ComponentType.FLOW_CONTROLLER);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.events;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;
/**
*
*/
public class SystemBulletin extends Bulletin {
SystemBulletin(final long id) {
super(id);
setSourceType(ComponentType.FLOW_CONTROLLER);
}
}

View File

@ -18,9 +18,7 @@ package org.apache.nifi.logging;
public interface LogRepository {
void addLogMessage(LogLevel level, String message);
void addLogMessage(LogLevel level, String message, Throwable t);
void addLogMessage(LogMessage logMessage);
void addLogMessage(LogLevel level, String messageFormat, Object[] params);

View File

@ -17,14 +17,12 @@
package org.apache.nifi.mock;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stubs out the functionality of a ComponentLog so that it can
* be used during initialization of a component.
*
*/
public class MockComponentLogger implements ComponentLog {
@ -116,9 +114,8 @@ public class MockComponentLogger implements ComponentLog {
@Override
public void info(String msg, Object[] os, Throwable t) {
logger.trace(msg, os);
logger.trace("", t);
logger.info(msg, os);
logger.info("", t);
}
@Override
@ -167,92 +164,4 @@ public class MockComponentLogger implements ComponentLog {
public void debug(String msg) {
logger.debug(msg);
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.rules.handlers;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogMessage;
public class MockComponentLog implements ComponentLog {
@ -55,6 +56,11 @@ public class MockComponentLog implements ComponentLog {
warnMessage = msg;
}
@Override
public void warn(LogMessage logMessage) {
warnMessage = logMessage.getMessage();
}
@Override
public void trace(String msg, Throwable t) {
trace(msg);
@ -75,6 +81,11 @@ public class MockComponentLog implements ComponentLog {
trace(convertMessage(msg,os));
}
@Override
public void trace(LogMessage logMessage) {
traceMessage = logMessage.getMessage();
}
@Override
public boolean isWarnEnabled() {
return true;
@ -120,6 +131,11 @@ public class MockComponentLog implements ComponentLog {
info(convertMessage(msg,os));
}
@Override
public void info(LogMessage message) {
infoMessage = message.getMessage();
}
@Override
public String getName() {
return null;
@ -145,6 +161,11 @@ public class MockComponentLog implements ComponentLog {
error(msg);
}
@Override
public void error(LogMessage message) {
errorMessage = message.getMessage();
}
@Override
public void debug(String msg, Throwable t) {
debug(msg);
@ -165,6 +186,11 @@ public class MockComponentLog implements ComponentLog {
debugMessage = msg;
}
@Override
public void debug(LogMessage message) {
debugMessage = message.getMessage();
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
@ -175,33 +201,6 @@ public class MockComponentLog implements ComponentLog {
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case WARN:
warn(msg);
break;
case DEBUG:
debug(msg);
break;
case INFO:
info(msg);
break;
case ERROR:
error(msg);
break;
case TRACE:
trace(msg);
break;
case FATAL:
error(msg);
break;
case NONE:
info(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {

View File

@ -17,27 +17,6 @@
package org.apache.nifi.reporting;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
@ -53,6 +32,26 @@ import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.scheduling.SchedulingStrategy;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Tags({"bulletin", "site", "site to site"})
@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
+ "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
@ -89,24 +88,24 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
final String nodeId = context.getClusterNodeIdentifier();
if (nodeId == null && isClustered) {
getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+ "Will wait for Node Identifier to be established.");
+ "Will wait for Node Identifier to be established.");
return;
}
final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
if(bulletins == null || bulletins.isEmpty()) {
if (bulletins == null || bulletins.isEmpty()) {
getLogger().debug("No events to send because no events are stored in the repository.");
return;
}
final OptionalLong opMaxId = bulletins.stream().mapToLong(t -> t.getId()).max();
final Long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1;
final OptionalLong opMaxId = bulletins.stream().mapToLong(Bulletin::getId).max();
final long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1;
if(currMaxId < lastSentBulletinId){
if (currMaxId < lastSentBulletinId) {
getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
+ "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
+ "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", currMaxId, lastSentBulletinId);
lastSentBulletinId = -1;
}
@ -130,8 +129,8 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
// Create a JSON array of all the events in the current batch
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
for (final Bulletin bulletin : bulletins) {
if(bulletin.getId() > lastSentBulletinId) {
arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId, allowNullValues));
if (bulletin.getId() > lastSentBulletinId) {
arrayBuilder.add(serialize(builder, bulletin, df, platform, nodeId, allowNullValues));
}
}
final JsonArray jsonArray = arrayBuilder.build();
@ -162,7 +161,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId());
} catch (final Exception e) {
if (transaction != null) {
transaction.error();
@ -177,8 +176,8 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
lastSentBulletinId = currMaxId;
}
private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
final String platform, final String nodeIdentifier, Boolean allowNullValues) {
private JsonObject serialize(final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
final String platform, final String nodeIdentifier, Boolean allowNullValues) {
addField(builder, "objectId", UUID.randomUUID().toString(), allowNullValues);
addField(builder, "platform", platform, allowNullValues);
@ -195,6 +194,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
addField(builder, "bulletinSourceName", bulletin.getSourceName(), allowNullValues);
addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(), allowNullValues);
addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()), allowNullValues);
addField(builder, "bulletinFlowFileUuid", bulletin.getFlowFileUuid(), allowNullValues);
return builder.build();
}

View File

@ -56,7 +56,8 @@
{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" },
{ "name" : "bulletinFlowFileUuid", "type" : ["string", "null"] }
]
}
</code>

View File

@ -17,6 +17,7 @@
{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" },
{ "name" : "bulletinFlowFileUuid", "type" : ["string", "null"] }
]
}

View File

@ -32,7 +32,6 @@ import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.json.Json;
@ -56,14 +55,11 @@ import static org.mockito.Mockito.when;
public class TestSiteToSiteBulletinReportingTask {
@Test
public void testUrls() throws IOException {
public void testUrls() {
final ValidationContext context = Mockito.mock(ValidationContext.class);
Mockito.when(context.newPropertyValue(Mockito.anyString())).then(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(InvocationOnMock invocation) throws Throwable {
String value = (String) invocation.getArguments()[0];
return new StandardPropertyValue(value, null, null);
}
Mockito.when(context.newPropertyValue(Mockito.anyString())).then((Answer<PropertyValue>) invocation -> {
String value = (String) invocation.getArguments()[0];
return new StandardPropertyValue(value, null, null);
});
assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi", context).isValid());
@ -80,8 +76,8 @@ public class TestSiteToSiteBulletinReportingTask {
@Test
public void testSerializedForm() throws IOException, InitializationException {
// creating the list of bulletins
final List<Bulletin> bulletins = new ArrayList<Bulletin>();
bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", ComponentType.PROCESSOR, "source-name", "category", "severity", "message", "group-path"));
final List<Bulletin> bulletins = new ArrayList<>();
bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", ComponentType.PROCESSOR, "source-name", "category", "severity", "message", "group-path", "flowFileUuid"));
// mock the access to the list of bulletins
final ReportingContext context = Mockito.mock(ReportingContext.class);
@ -100,12 +96,9 @@ public class TestSiteToSiteBulletinReportingTask {
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteUtils.PLATFORM, "nifi");
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
// setup the mock initialization context
@ -125,12 +118,13 @@ public class TestSiteToSiteBulletinReportingTask {
assertEquals("message", bulletinJson.getString("bulletinMessage"));
assertEquals("group-name", bulletinJson.getString("bulletinGroupName"));
assertEquals("group-path", bulletinJson.getString("bulletinGroupPath"));
assertEquals("flowFileUuid", bulletinJson.getString("bulletinFlowFileUuid"));
}
@Test
public void testSerializedFormWithNullValues() throws IOException, InitializationException {
// creating the list of bulletins
final List<Bulletin> bulletins = new ArrayList<Bulletin>();
final List<Bulletin> bulletins = new ArrayList<>();
bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", "source-name", "category", "severity", "message"));
// mock the access to the list of bulletins
@ -151,12 +145,9 @@ public class TestSiteToSiteBulletinReportingTask {
properties.put(SiteToSiteUtils.PLATFORM, "nifi");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES, "true");
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
// setup the mock initialization context

View File

@ -105,7 +105,8 @@ public class BulletinEnumerator implements Enumerator<Object> {
bulletin.getSourceId(),
bulletin.getSourceName(),
bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(),
bulletin.getTimestamp() == null ? null : bulletin.getTimestamp().getTime()
bulletin.getTimestamp() == null ? null : bulletin.getTimestamp().getTime(),
bulletin.getFlowFileUuid()
};
// If we want no fields just return null

View File

@ -164,7 +164,8 @@ public class BulletinTable extends AbstractTable implements QueryableTable, Tran
"bulletinSourceId",
"bulletinSourceName",
"bulletinSourceType",
"bulletinTimestamp"
"bulletinTimestamp",
"bulletinFlowFileUuid"
);
final List<RelDataType> types = Arrays.asList(
typeFactory.createJavaType(long.class),
@ -179,7 +180,8 @@ public class BulletinTable extends AbstractTable implements QueryableTable, Tran
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(long.class)
typeFactory.createJavaType(long.class),
typeFactory.createJavaType(String.class)
);
relDataType = typeFactory.createStructType(Pair.zip(names, types));

View File

@ -111,6 +111,7 @@
<tr><td>bulletinSourceName</td><td>String</td></tr>
<tr><td>bulletinSourceType</td><td>String</td></tr>
<tr><td>bulletinTimestamp</td><td>Date</td></tr>
<tr><td>bulletinFlowFileUuid</td><td>String</td></tr>
</table>
<br/>
<h3>PROCESS_GROUP_STATUS</h3>

View File

@ -307,22 +307,26 @@ public class TestQueryNiFiReportingTask {
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
final List<Map<String, Object>> rows = mockRecordSinkService.getRows();
final String flowFileUuid = "testFlowFileUuid";
assertEquals(3, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(13, row.size());
assertEquals(14, row.size());
assertNotNull(row.get("bulletinId"));
assertEquals("controller", row.get("bulletinCategory"));
assertEquals("WARN", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
// Validate the second row
row = rows.get(1);
assertEquals("processor", row.get("bulletinCategory"));
assertEquals("INFO", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
// Validate the third row
row = rows.get(2);
assertEquals("controller service", row.get("bulletinCategory"));
assertEquals("ERROR", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
}
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
@ -409,9 +413,9 @@ public class TestQueryNiFiReportingTask {
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
MockBulletinRepository bulletinRepository = new MockQueryBulletinRepository();
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", "INFO", "test bulletin 1"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller service", "ERROR", "test bulletin 2"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2", "testFlowFileUuid"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", "INFO", "test bulletin 1", "testFlowFileUuid"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller service", "ERROR", "test bulletin 2", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
return reportingTask;