NIFI-2595: - Updating ReportingTasks to use ComponentLogger instead of creating Controller level bulletins.

- Making the bulletin responses consistent in that all bulletins will be included but in redacted form as appropriate.
- Fixing broken unit test.

This closes #892.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Gilman 2016-08-18 14:28:25 -04:00 committed by Bryan Bende
parent f3387426a0
commit 87161ab96e
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
8 changed files with 78 additions and 116 deletions

View File

@ -16,13 +16,13 @@
*/
package org.apache.nifi.reporting;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerServiceLookup;
import java.util.Map;
/**
* This interface provides a bridge between the NiFi Framework and a
* {@link ReportingTask}. This context allows a ReportingTask to access
@ -59,9 +59,10 @@ public interface ReportingContext {
BulletinRepository getBulletinRepository();
/**
* Creates a system-level {@link Bulletin} with the given category, severity
* Creates a controller-level {@link Bulletin} with the given category, severity
* level, and message, so that the Bulletin can be added to the
* {@link BulletinRepository}.
* {@link BulletinRepository}. Access to this bulletin will be enforce through
* permissions on the controller.
*
* @param category of bulletin
* @param severity of bulletin
@ -72,7 +73,7 @@ public interface ReportingContext {
/**
* Creates a {@link Bulletin} for the component with the specified
* identifier
* identifier.
*
* @param componentId the ID of the component
* @param category the name of the bulletin's category

View File

@ -193,15 +193,6 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
}
for (final String key : new String[] { SERVICE_BULLETIN_STORE_KEY, REPORTING_TASK_BULLETIN_STORE_KEY }) {
final ConcurrentMap<String, RingBuffer<Bulletin>> bulletinMap = bulletinStoreMap.get(key);
if (bulletinMap != null) {
for (final RingBuffer<Bulletin> buffer : bulletinMap.values()) {
controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
}
}
}
// We only want the newest bulletin, so we sort based on time and take the top 'max' entries
Collections.sort(controllerBulletins);
if (controllerBulletins.size() > max) {

View File

@ -2463,48 +2463,50 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity();
final List<BulletinEntity> controllerBulletinEntities = new ArrayList<>();
final Authorizable controllerAuthorizable = authorizableLookup.getController();
final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
if (authorized) {
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController());
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList());
controllerBulletinsEntity.setBulletins(bulletinEntities);
}
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController());
controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList()));
// get the controller service bulletins
final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery);
final List<BulletinEntity> authorizedControllerServiceBulletinEntities = new ArrayList<>();
final List<BulletinEntity> controllerServiceBulletinEntities = new ArrayList<>();
for (final Bulletin bulletin : allControllerServiceBulletins) {
try {
final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable();
final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
if (controllerServiceAuthorized) {
authorizedControllerServiceBulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized));
}
final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized);
controllerServiceBulletinEntities.add(controllerServiceBulletin);
controllerBulletinEntities.add(controllerServiceBulletin);
} catch (final ResourceNotFoundException e) {
// controller service missing.. skip
}
}
controllerBulletinsEntity.setControllerServiceBulletins(authorizedControllerServiceBulletinEntities);
controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities);
// get the reporting task bulletins
final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery);
final List<BulletinEntity> authorizedReportingTaskBulletinEntities = new ArrayList<>();
final List<BulletinEntity> reportingTaskBulletinEntities = new ArrayList<>();
for (final Bulletin bulletin : allReportingTaskBulletins) {
try {
final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable();
final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
if (reportingTaskAuthorizableAuthorized) {
authorizedReportingTaskBulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized));
}
final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized);
reportingTaskBulletinEntities.add(reportingTaskBulletin);
controllerBulletinEntities.add(reportingTaskBulletin);
} catch (final ResourceNotFoundException e) {
// reporting task missing.. skip
}
}
controllerBulletinsEntity.setReportingTaskBulletins(authorizedReportingTaskBulletinEntities);
controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities);
controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER));
return controllerBulletinsEntity;
}
@ -2706,6 +2708,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
}
return pruneAndSortBulletins(bulletinEntities, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
}
private List<BulletinEntity> pruneAndSortBulletins(final List<BulletinEntity> bulletinEntities, final int maxBulletins) {
// sort the bulletins
Collections.sort(bulletinEntities, new Comparator<BulletinEntity>() {
@Override
@ -2725,11 +2731,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
});
// prune the response to only include the max number of bulletins
if (bulletinEntities.size() > BulletinRepository.MAX_BULLETINS_PER_COMPONENT) {
bulletinEntities = bulletinEntities.subList(0, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
if (bulletinEntities.size() > maxBulletins) {
return bulletinEntities.subList(0, maxBulletins);
} else {
return bulletinEntities;
}
return bulletinEntities;
}
@Override

View File

@ -1195,7 +1195,7 @@ nf.Common = (function () {
if ($.isArray(bulletins) && $.isArray(otherBulletins)) {
if (bulletins.length === otherBulletins.length) {
for (var i = 0; i < bulletins.length; i++) {
if (bulletins[i].id !== otherBulletins[i].id) {
if (bulletins[i].id !== otherBulletins[i].id || bulletins[i].canRead !== otherBulletins[i].canRead) {
return true;
}
}

View File

@ -16,6 +16,15 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.util.FormatUtils;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
@ -23,26 +32,12 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Tags({"disk", "storage", "warning", "monitoring", "repo"})
@CapabilityDescription("Checks the amount of storage space available for the specified directory"
+ " and warns (via a log message and a System-Level Bulletin) if the partition on which it lives exceeds"
+ " some configurable threshold of storage space")
public class MonitorDiskUsage extends AbstractReportingTask {
private static final Logger logger = LoggerFactory.getLogger(MonitorDiskUsage.class);
private static final Pattern PERCENT_PATTERN = Pattern.compile("(\\d+{1,2})%");
public static final PropertyDescriptor DIR_THRESHOLD = new PropertyDescriptor.Builder()
@ -88,11 +83,11 @@ public class MonitorDiskUsage extends AbstractReportingTask {
final File dir = new File(context.getProperty(DIR_LOCATION).getValue());
final String dirName = context.getProperty(DIR_DISPLAY_NAME).getValue();
checkThreshold(dirName, dir.toPath(), contentRepoThreshold, context);
checkThreshold(dirName, dir.toPath(), contentRepoThreshold, getLogger());
}
static void checkThreshold(final String pathName, final Path path, final int threshold, final ReportingContext context) {
static void checkThreshold(final String pathName, final Path path, final int threshold, final ComponentLog logger) {
final File file = path.toFile();
final long totalBytes = file.getTotalSpace();
final long freeBytes = file.getFreeSpace();
@ -109,8 +104,6 @@ public class MonitorDiskUsage extends AbstractReportingTask {
final String message = String.format("%1$s exceeds configured threshold of %2$s%%, having %3$s / %4$s (%5$.2f%%) used and %6$s (%7$.2f%%) free",
pathName, threshold, usedSpace, totalSpace, usedPercent, freeSpace, freePercent);
final Bulletin bulletin = context.createBulletin("Disk Usage", Severity.WARNING, message);
context.getBulletinRepository().addBulletin(bulletin);
logger.warn(message);
}
}

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.controller;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -36,13 +27,18 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Reporting task used to monitor usage of memory after Garbage Collection has
@ -130,8 +126,6 @@ public class MonitorMemory extends AbstractReportingTask {
public static final Pattern DATA_SIZE_PATTERN = DataUnit.DATA_SIZE_PATTERN;
public static final Pattern TIME_PERIOD_PATTERN = FormatUtils.TIME_DURATION_PATTERN;
private static final Logger logger = LoggerFactory.getLogger(MonitorMemory.class);
private volatile MemoryPoolMXBean monitoredBean;
private volatile String threshold = "65%";
private volatile long lastReportTime;
@ -200,8 +194,8 @@ public class MonitorMemory extends AbstractReportingTask {
final MemoryUsage usage = bean.getUsage();
if (usage == null) {
logger.warn("{} could not determine memory usage for pool with name {}", this,
context.getProperty(MEMORY_POOL_PROPERTY));
getLogger().warn("{} could not determine memory usage for pool with name {}", new Object[] {this,
context.getProperty(MEMORY_POOL_PROPERTY)});
return;
}
@ -217,9 +211,7 @@ public class MonitorMemory extends AbstractReportingTask {
bean.getName(), threshold, FormatUtils.formatDataSize(usage.getUsed()),
FormatUtils.formatDataSize(usage.getMax()), percentageUsed);
logger.warn("{}", message);
final Bulletin bulletin = context.createBulletin("Memory Management", Severity.WARNING, message);
context.getBulletinRepository().addBulletin(bulletin);
getLogger().warn("{}", new Object[] {message});
} else if (lastValueWasExceeded) {
lastValueWasExceeded = false;
lastReportTime = System.currentTimeMillis();
@ -227,9 +219,7 @@ public class MonitorMemory extends AbstractReportingTask {
bean.getName(), threshold, FormatUtils.formatDataSize(usage.getUsed()),
FormatUtils.formatDataSize(usage.getMax()), percentageUsed);
logger.info("{}", message);
final Bulletin bulletin = context.createBulletin("Memory Management", Severity.INFO, message);
context.getBulletinRepository().addBulletin(bulletin);
getLogger().info("{}", new Object[] {message});
}
}

View File

@ -16,12 +16,10 @@
*/
package org.apache.nifi.reporting.ganglia;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.GangliaReporter;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -33,13 +31,12 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.GangliaReporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Configuration of this reporting task requires a "host" property that points
@ -76,7 +73,6 @@ public class StandardGangliaReporter extends AbstractReportingTask {
.build();
public static final String METRICS_GROUP = "NiFi";
private static final Logger logger = LoggerFactory.getLogger(StandardGangliaReporter.class);
private MetricsRegistry metricsRegistry;
private GangliaReporter gangliaReporter;
@ -245,7 +241,7 @@ public class StandardGangliaReporter extends AbstractReportingTask {
this.latestStatus.set(rootGroupStatus);
gangliaReporter.run();
logger.info("{} Sent metrics to Ganglia", this);
getLogger().info("{} Sent metrics to Ganglia", new Object[] {this});
}
private long calculateProcessingNanos(final ProcessGroupStatus status) {

View File

@ -16,21 +16,14 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.controller.MonitorDiskUsage;
import static org.junit.Assert.assertEquals;
import org.apache.nifi.logging.ComponentLog;
import org.junit.Test;
import org.mockito.Mockito;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
public class TestMonitorDiskUsage {
@ -38,23 +31,15 @@ public class TestMonitorDiskUsage {
public void testGeneratesMessageIfTooFull() {
final AtomicInteger callCounter = new AtomicInteger(0);
final ReportingContext context = Mockito.mock(ReportingContext.class);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final String message = (String) invocation.getArguments()[2];
System.out.println(message);
callCounter.incrementAndGet();
return null;
}
final ComponentLog logger = Mockito.mock(ComponentLog.class);
Mockito.doAnswer(invocation -> {
final String message = (String) invocation.getArguments()[0];
System.out.println(message);
callCounter.incrementAndGet();
return null;
}).when(logger).warn(Mockito.anyString());
}).when(context).createBulletin(Mockito.any(String.class), Mockito.any(Severity.class), Mockito.any(String.class));
final BulletinRepository brepo = Mockito.mock(BulletinRepository.class);
Mockito.doNothing().when(brepo).addBulletin(Mockito.any(Bulletin.class));
Mockito.doReturn(brepo).when(context).getBulletinRepository();
MonitorDiskUsage.checkThreshold("Test Path", Paths.get("."), 0, context);
MonitorDiskUsage.checkThreshold("Test Path", Paths.get("."), 0, logger);
assertEquals(1, callCounter.get());
}