NIFI-4344: Improve bulletin messages with exception details.

This closes #5093.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
Lehel Boér 2021-03-31 20:58:26 +02:00 committed by Tamas Palfy
parent 8e5ce46f6c
commit 0c748a5a2b
4 changed files with 44 additions and 26 deletions

View File

@ -34,8 +34,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StandardLogRepository implements LogRepository {
public static final int DEFAULT_MAX_CAPACITY_PER_LEVEL = 10;
private final Map<LogLevel, Collection<LogObserver>> observers = new HashMap<>();
private final Map<String, LogObserver> observerLookup = new HashMap<>();
@ -82,7 +80,7 @@ public class StandardLogRepository implements LogRepository {
addLogMessage(level, formattedMessage, t);
}
private void replaceThrowablesWithMessage(Object[] params) {
private void replaceThrowablesWithMessage(final Object[] params) {
for (int i = 0; i < params.length; i++) {
if(params[i] instanceof Throwable) {
params[i] = ((Throwable) params[i]).getLocalizedMessage();

View File

@ -24,8 +24,14 @@ import org.apache.nifi.logging.LogRepositoryFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.stream.Collectors;
public class SimpleProcessLogger implements ComponentLog {
public static final String NEW_LINE_ARROW = "\u21B3";
public static final String CAUSES = NEW_LINE_ARROW + " causes: ";
private final Logger logger;
private final LogRepository logRepository;
private final Object component;
@ -58,7 +64,7 @@ public class SimpleProcessLogger implements ComponentLog {
}
msg = "{} " + msg;
final Object[] os = {component, t.toString(), t};
final Object[] os = {component, getCauses(t), t};
logger.warn(msg, os);
logRepository.addLogMessage(LogLevel.WARN, msg, os, t);
}
@ -110,7 +116,7 @@ public class SimpleProcessLogger implements ComponentLog {
}
msg = "{} " + msg;
final Object[] os = {component, t.toString(), t};
final Object[] os = {component, getCauses(t), t};
logger.trace(msg, os);
logRepository.addLogMessage(LogLevel.TRACE, msg, os, t);
}
@ -184,7 +190,7 @@ public class SimpleProcessLogger implements ComponentLog {
}
msg = "{} " + msg;
final Object[] os = {component, t.toString()};
final Object[] os = {component, getCauses(t)};
logger.info(msg, os);
if (logger.isDebugEnabled()) {
@ -245,12 +251,12 @@ public class SimpleProcessLogger implements ComponentLog {
if (t == null) {
msg = "{} " + msg;
final Object[] os = new Object[] {component};
final Object[] os = new Object[]{component};
logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os);
} else {
msg = "{} " + msg + ": {}";
final Object[] os = new Object[] {component, t.toString(), t};
final Object[] os = new Object[]{component, getCauses(t), t};
logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
}
@ -301,7 +307,7 @@ public class SimpleProcessLogger implements ComponentLog {
modifiedArgs = new Object[os.length + 3];
modifiedArgs[0] = component.toString();
System.arraycopy(os, 0, modifiedArgs, 1, os.length);
modifiedArgs[modifiedArgs.length - 2] = t.toString();
modifiedArgs[modifiedArgs.length - 2] = getCauses(t);
modifiedArgs[modifiedArgs.length - 1] = t;
}
@ -448,4 +454,12 @@ public class SimpleProcessLogger implements ComponentLog {
}
}
private String getCauses(final Throwable throwable) {
final LinkedList<String> causes = new LinkedList<>();
for (Throwable t = throwable; t != null; t = t.getCause()) {
causes.push(t.toString());
}
return causes.stream().collect(Collectors.joining(System.lineSeparator() + CAUSES));
}
}

View File

@ -16,6 +16,15 @@
*/
package org.apache.nifi.processor;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import java.lang.reflect.Field;
import static org.apache.nifi.processor.SimpleProcessLogger.NEW_LINE_ARROW;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@ -24,16 +33,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
public class TestSimpleProcessLogger {
private final Exception e = new RuntimeException("intentional");
private static final String EXPECTED_CAUSES = "java.lang.RuntimeException: third" + System.lineSeparator() +
NEW_LINE_ARROW + " causes: java.lang.RuntimeException: second" + System.lineSeparator() +
NEW_LINE_ARROW + " causes: java.lang.RuntimeException: first";
private final Exception e = new RuntimeException("first", new RuntimeException("second", new RuntimeException("third")));
private ReportingTask task;
@ -68,7 +74,7 @@ public class TestSimpleProcessLogger {
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnError() {
componentLog.error("Hello {}", e);
verify(logger, times(1)).error(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).error(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}
@Test
@ -80,24 +86,24 @@ public class TestSimpleProcessLogger {
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnTrace() {
componentLog.trace("Hello {}", e);
verify(logger, times(1)).trace(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).trace(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnWarn() {
componentLog.warn("Hello {}", e);
verify(logger, times(1)).warn(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).warn(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnLogWithLevel() {
componentLog.log(LogLevel.WARN, "Hello {}", e);
verify(logger, times(1)).warn(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).warn(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
componentLog.log(LogLevel.ERROR, "Hello {}", e);
verify(logger, times(1)).error(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).error(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
componentLog.log(LogLevel.INFO, "Hello {}", e);
verify(logger, times(1)).info(anyString(), eq(e));
componentLog.log(LogLevel.TRACE, "Hello {}", e);
verify(logger, times(1)).trace(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).trace(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}
}

View File

@ -251,8 +251,8 @@ public class ConnectableTask {
} catch (final Throwable t) {
// Use ComponentLog to log the event so that a bulletin will be created for this processor
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
procLog.error("{} failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {connectable.getRunnableComponent(), t, schedulingAgent.getAdministrativeYieldDuration()}, t);
procLog.error("Failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {t, schedulingAgent.getAdministrativeYieldDuration()}, t);
logger.warn("Administratively Yielding {} due to uncaught Exception: {}", connectable.getRunnableComponent(), t.toString(), t);
connectable.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);