diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java index 70dedfaf5d..c219032a0a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java @@ -31,6 +31,11 @@ public interface EventAccess { */ ProcessGroupStatus getControllerStatus(); + /** + * @return the status of all components in the specified group. + */ + ProcessGroupStatus getGroupStatus(final String groupId); + /** * Convenience method to obtain Provenance Events starting with (and * including) the given ID. If no event exists with that ID, the first event diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java index 5c9a0ed887..2a2aab2f52 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java @@ -41,6 +41,11 @@ public class MockEventAccess implements EventAccess { return processGroupStatus; } + @Override + public ProcessGroupStatus getGroupStatus(final String groupId) { + return null; + } + @Override public List getProvenanceEvents(long firstEventId, int maxRecords) throws IOException { if (firstEventId < 0 || maxRecords < 1) { @@ -65,6 +70,7 @@ public class MockEventAccess implements EventAccess { this.provenanceRecords.add(record); } + @Override public ProvenanceEventRepository getProvenanceRepository() { return null; } diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java index 5080583ff2..0ad434536d 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java @@ -80,6 +80,15 @@ public class AmbariReportingTask extends AbstractReportingTask { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder() + .name("Process Group ID") + .description("If specified, the reporting task will send metrics about this process group only. If" + + " not, the root process group is used and global metrics are sent.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + private volatile Client client; private volatile JsonBuilderFactory factory; private volatile VirtualMachineMetrics virtualMachineMetrics; @@ -93,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask { properties.add(METRICS_COLLECTOR_URL); properties.add(APPLICATION_ID); properties.add(HOSTNAME); + properties.add(PROCESS_GROUP_ID); return properties; } @@ -112,12 +122,12 @@ public class AmbariReportingTask extends AbstractReportingTask { @Override public void onTrigger(final ReportingContext context) { - final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL) - .evaluateAttributeExpressions().getValue(); - final String applicationId = context.getProperty(APPLICATION_ID) - .evaluateAttributeExpressions().getValue(); - final String hostname = context.getProperty(HOSTNAME) - .evaluateAttributeExpressions().getValue(); + final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL).evaluateAttributeExpressions().getValue(); + final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue(); + final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + + final boolean pgIdIsSet = context.getProperty(PROCESS_GROUP_ID).isSet(); + final String processGroupId = pgIdIsSet ? context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue() : null; final long start = System.currentTimeMillis(); @@ -140,22 +150,28 @@ public class AmbariReportingTask extends AbstractReportingTask { } // calculate the current metrics, but store them to be sent next time - final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); - final Map statusMetrics = metricsService.getMetrics(status); - final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); + final ProcessGroupStatus status = processGroupId == null ? context.getEventAccess().getControllerStatus() : context.getEventAccess().getGroupStatus(processGroupId); - final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); + if(status != null) { + final Map statusMetrics = metricsService.getMetrics(status, pgIdIsSet); + final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); - final JsonObject metricsObject = metricsBuilder - .applicationId(applicationId) - .instanceId(status.getId()) - .hostname(hostname) - .timestamp(start) - .addAllMetrics(statusMetrics) - .addAllMetrics(jvmMetrics) - .build(); + final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); - previousMetrics = metricsObject; + final JsonObject metricsObject = metricsBuilder + .applicationId(applicationId) + .instanceId(status.getId()) + .hostname(hostname) + .timestamp(start) + .addAllMetrics(statusMetrics) + .addAllMetrics(jvmMetrics) + .build(); + + previousMetrics = metricsObject; + } else { + getLogger().error("No process group status with ID = {}", new Object[]{processGroupId}); + previousMetrics = null; + } } } diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java index 4b626686bf..20cfa4e282 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java @@ -21,6 +21,9 @@ package org.apache.nifi.reporting.ambari.metrics; */ public interface MetricNames { + // Metric Name separator + String METRIC_NAME_SEPARATOR = "."; + // NiFi Metrics String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java index 0671dabb16..cef257dad9 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java @@ -33,25 +33,26 @@ public class MetricsService { * Generates a Map of metrics for a ProcessGroupStatus instance. * * @param status a ProcessGroupStatus to get metrics from + * @param appendPgId if true, the process group ID will be appended at the end of the metric name * @return a map of metrics for the given status */ - public Map getMetrics(ProcessGroupStatus status) { + public Map getMetrics(ProcessGroupStatus status, boolean appendPgId) { final Map metrics = new HashMap<>(); - metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived())); - metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived())); - metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent())); - metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent())); - metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount())); - metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize())); - metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead())); - metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten())); - metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount())); + metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived())); + metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived())); + metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent())); + metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent())); + metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount())); + metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize())); + metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead())); + metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten())); + metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount())); final long durationNanos = calculateProcessingNanos(status); - metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, String.valueOf(durationNanos)); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos)); final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - metrics.put(MetricNames.TOTAL_TASK_DURATION_SECONDS, String.valueOf(durationSeconds)); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds)); return metrics; } @@ -118,4 +119,13 @@ public class MetricsService { return nanos; } + // append the process group ID if necessary + private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) { + if(appendPgId) { + return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId(); + } else { + return name; + } + } + } diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java index 5e23bf7636..8cf02122ed 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java @@ -110,6 +110,8 @@ public class TestAmbariReportingTask { .thenReturn(new MockPropertyValue(applicationId)); Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME)) .thenReturn(new MockPropertyValue(hostName)); + Mockito.when(context.getProperty(AmbariReportingTask.PROCESS_GROUP_ID)) + .thenReturn(new MockPropertyValue("1234")); final EventAccess eventAccess = Mockito.mock(EventAccess.class); @@ -122,6 +124,7 @@ public class TestAmbariReportingTask { task.setup(configurationContext); task.onTrigger(context); } + // override the creation of the client to provide a mock private class TestableAmbariReportingTask extends AmbariReportingTask { diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java index e8cc79244c..93224eb190 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java @@ -60,7 +60,7 @@ public class TestMetricsService { final MetricsService service = new MetricsService(); - final Map metrics = service.getMetrics(status); + final Map metrics = service.getMetrics(status, false); Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED)); Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED)); @@ -75,6 +75,44 @@ public class TestMetricsService { Assert.assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION_NANOS)); } + @Test + public void testGetProcessGroupStatusMetricsWithID() { + ProcessGroupStatus status = new ProcessGroupStatus(); + String id = "1234"; + status.setId(id); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesRead(60000L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + + // create a processor status with processing time + ProcessorStatus procStatus = new ProcessorStatus(); + procStatus.setProcessingNanos(123456789); + + Collection processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + // create a group status with processing time + ProcessGroupStatus groupStatus = new ProcessGroupStatus(); + groupStatus.setProcessorStatus(processorStatuses); + + Collection groupStatuses = new ArrayList<>(); + groupStatuses.add(groupStatus); + status.setProcessGroupStatus(groupStatuses); + + final MetricsService service = new MetricsService(); + + final Map metrics = service.getMetrics(status, true); + + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED + MetricNames.METRIC_NAME_SEPARATOR + id)); + } + @Test public void testGetVirtualMachineMetrics() { final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 59ea8fda02..7fd85b9640 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2246,6 +2246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param groupId group id * @return the component status */ + @Override public ProcessGroupStatus getGroupStatus(final String groupId) { return getGroupStatus(groupId, getProcessorStats()); }