NIFI-2194 This closes #659. Caching metrics in AmbariReportingTask so each iteration sends last iteration's metrics

This commit is contained in:
Bryan Bende 2016-07-13 17:53:54 -04:00 committed by joewitt
parent 54574e3889
commit afc038d2c0
1 changed files with 28 additions and 17 deletions

View File

@ -47,7 +47,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tags({"reporting", "ambari", "metrics"})
@CapabilityDescription("Publishes metrics from NiFi to Ambari")
@CapabilityDescription("Publishes metrics from NiFi to Ambari Metrics Service (AMS). Due to how the Ambari Metrics Service " +
"works, this reporting task should be scheduled to run every 60 seconds. Each iteration it will send the metrics " +
"from the previous iteration, and calculate the current metrics to be sent on next iteration. Scheduling this reporting " +
"task at a frequency other than 60 seconds may produce unexpected results.")
public class AmbariReportingTask extends AbstractReportingTask {
static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder()
@ -80,6 +83,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
private volatile Client client;
private volatile JsonBuilderFactory factory;
private volatile VirtualMachineMetrics virtualMachineMetrics;
private volatile JsonObject previousMetrics = null;
private final MetricsService metricsService = new MetricsService();
@ -98,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
factory = Json.createBuilderFactory(config);
client = createClient();
virtualMachineMetrics = VirtualMachineMetrics.getInstance();
previousMetrics = null;
}
// used for testing to allow tests to override the client
@ -107,8 +112,6 @@ public class AmbariReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
.evaluateAttributeExpressions().getValue();
final String applicationId = context.getProperty(APPLICATION_ID)
@ -118,6 +121,27 @@ public class AmbariReportingTask extends AbstractReportingTask {
final long start = System.currentTimeMillis();
// send the metrics from last execution
if (previousMetrics != null) {
final WebTarget metricsTarget = client.target(metricsCollectorUrl);
final Invocation.Builder invocation = metricsTarget.request();
final Entity<String> entity = Entity.json(previousMetrics.toString());
getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
final Response response = invocation.post(entity);
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[]{completedMillis});
} else {
final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
}
}
// calculate the current metrics, but store them to be sent next time
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
final Map<String,String> statusMetrics = metricsService.getMetrics(status);
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
@ -132,20 +156,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
.addAllMetrics(jvmMetrics)
.build();
final WebTarget metricsTarget = client.target(metricsCollectorUrl);
final Invocation.Builder invocation = metricsTarget.request();
final Entity<String> entity = Entity.json(metricsObject.toString());
getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
final Response response = invocation.post(entity);
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis});
} else {
final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
}
previousMetrics = metricsObject;
}
}