NIFI-1724 Added properties to configure log level when file not found and permission denied on FetchFile processor

NIFI-1724 Added unit test for logging with level
This closes #348
This commit is contained in:
Pierre Villard 2016-04-13 14:21:43 +02:00 committed by Oleg Zhurakousky
parent f719cbf60c
commit 5e55a543eb
6 changed files with 308 additions and 2 deletions

View File

@ -101,4 +101,12 @@ public interface ComponentLog {
void debug(String msg, Object[] os, Throwable t);
void debug(String msg);
void log(LogLevel level, String msg, Throwable t);
void log(LogLevel level, String msg, Object[] os);
void log(LogLevel level, String msg);
void log(LogLevel level, String msg, Object[] os, Throwable t);
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.util;
import java.util.List;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -310,4 +311,92 @@ public class MockProcessorLog implements ProcessorLog {
logger.debug(msg, os);
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.documentation.mock;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -166,4 +167,92 @@ public class MockProcessorLogger implements ProcessorLog {
public void debug(String msg) {
logger.debug(msg);
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
}

View File

@ -373,4 +373,92 @@ public class SimpleProcessLogger implements ProcessorLog {
logRepository.addLogMessage(LogLevel.DEBUG, msg, os);
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
}

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
@ -91,6 +92,18 @@ public class TestSimpleProcessLogger {
verify(logger, times(1)).warn(anyString(), argThat(new MyVarargMatcher()));
}
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnLogWithLevel() {
componentLog.log(LogLevel.WARN, "Hello {}", e);
verify(logger, times(1)).warn(anyString(), argThat(new MyVarargMatcher()));
componentLog.log(LogLevel.ERROR, "Hello {}", e);
verify(logger, times(1)).error(anyString(), argThat(new MyVarargMatcher()));
componentLog.log(LogLevel.INFO, "Hello {}", e);
verify(logger, times(1)).info(anyString(), argThat(new MyVarargMatcher()));
componentLog.log(LogLevel.TRACE, "Hello {}", e);
verify(logger, times(1)).trace(anyString(), argThat(new MyVarargMatcher()));
}
/**
*
*/

View File

@ -43,6 +43,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -100,6 +101,20 @@ public class FetchFile extends AbstractProcessor {
.defaultValue(CONFLICT_RENAME.getValue())
.required(true)
.build();
static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log level when file not found")
.description("Log level to use in case the file does not exist when the processor is trigerred")
.allowableValues(LogLevel.values())
.defaultValue(LogLevel.ERROR.toString())
.required(true)
.build();
static final PropertyDescriptor PERM_DENIED_LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log level when permission denied")
.description("Log level to use in case user " + System.getProperty("user.name") + " does not have sufficient permissions to read the file")
.allowableValues(LogLevel.values())
.defaultValue(LogLevel.ERROR.toString())
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -126,6 +141,8 @@ public class FetchFile extends AbstractProcessor {
properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
properties.add(CONFLICT_STRATEGY);
properties.add(FILE_NOT_FOUND_LOG_LEVEL);
properties.add(PERM_DENIED_LOG_LEVEL);
return properties;
}
@ -162,11 +179,13 @@ public class FetchFile extends AbstractProcessor {
final StopWatch stopWatch = new StopWatch(true);
final String filename = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
final LogLevel levelFileNotFound = LogLevel.valueOf(context.getProperty(FILE_NOT_FOUND_LOG_LEVEL).getValue());
final LogLevel levelPermDenied = LogLevel.valueOf(context.getProperty(PERM_DENIED_LOG_LEVEL).getValue());
final File file = new File(filename);
// Verify that file exists
if (!file.exists()) {
getLogger().error("Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", new Object[] {file, flowFile});
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", new Object[] {file, flowFile});
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
return;
@ -175,7 +194,7 @@ public class FetchFile extends AbstractProcessor {
// Verify read permission on file
final String user = System.getProperty("user.name");
if (!isReadable(file)) {
getLogger().error("Could not fetch file {} from file system for {} due to user {} not having sufficient permissions to read the file; routing to permission.denied",
getLogger().log(levelPermDenied, "Could not fetch file {} from file system for {} due to user {} not having sufficient permissions to read the file; routing to permission.denied",
new Object[] {file, flowFile, user});
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);