diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java index cff0b48924..7092dd08c4 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java @@ -47,7 +47,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; @Tags({"reporting", "ambari", "metrics"}) -@CapabilityDescription("Publishes metrics from NiFi to Ambari") +@CapabilityDescription("Publishes metrics from NiFi to Ambari Metrics Service (AMS). Due to how the Ambari Metrics Service " + + "works, this reporting task should be scheduled to run every 60 seconds. Each iteration it will send the metrics " + + "from the previous iteration, and calculate the current metrics to be sent on next iteration. Scheduling this reporting " + + "task at a frequency other than 60 seconds may produce unexpected results.") public class AmbariReportingTask extends AbstractReportingTask { static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder() @@ -80,6 +83,7 @@ public class AmbariReportingTask extends AbstractReportingTask { private volatile Client client; private volatile JsonBuilderFactory factory; private volatile VirtualMachineMetrics virtualMachineMetrics; + private volatile JsonObject previousMetrics = null; private final MetricsService metricsService = new MetricsService(); @@ -98,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask { factory = Json.createBuilderFactory(config); client = createClient(); virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + previousMetrics = null; } // used for testing to allow tests to override the client @@ -107,8 +112,6 @@ public class AmbariReportingTask extends AbstractReportingTask { @Override public void onTrigger(final ReportingContext context) { - final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); - final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL) .evaluateAttributeExpressions().getValue(); final String applicationId = context.getProperty(APPLICATION_ID) @@ -118,6 +121,27 @@ public class AmbariReportingTask extends AbstractReportingTask { final long start = System.currentTimeMillis(); + // send the metrics from last execution + if (previousMetrics != null) { + final WebTarget metricsTarget = client.target(metricsCollectorUrl); + final Invocation.Builder invocation = metricsTarget.request(); + + final Entity entity = Entity.json(previousMetrics.toString()); + getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()}); + + final Response response = invocation.post(entity); + if (response.getStatus() == Response.Status.OK.getStatusCode()) { + final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start); + getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[]{completedMillis}); + } else { + final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error"; + getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity}); + } + + } + + // calculate the current metrics, but store them to be sent next time + final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); final Map statusMetrics = metricsService.getMetrics(status); final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); @@ -132,20 +156,7 @@ public class AmbariReportingTask extends AbstractReportingTask { .addAllMetrics(jvmMetrics) .build(); - final WebTarget metricsTarget = client.target(metricsCollectorUrl); - final Invocation.Builder invocation = metricsTarget.request(); - - final Entity entity = Entity.json(metricsObject.toString()); - getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()}); - - final Response response = invocation.post(entity); - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start); - getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis}); - } else { - final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error"; - getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity}); - } + previousMetrics = metricsObject; } }