NIFI-3259 - Process group level option in Ambari reporting task

This closes #1360.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Pierre Villard 2016-12-27 20:16:32 +01:00 committed by Bryan Bende
parent 9b47961d1c
commit 2c0f1c348e
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
8 changed files with 114 additions and 32 deletions

View File

@ -31,6 +31,11 @@ public interface EventAccess {
*/
ProcessGroupStatus getControllerStatus();
/**
* @return the status of all components in the specified group.
*/
ProcessGroupStatus getGroupStatus(final String groupId);
/**
* Convenience method to obtain Provenance Events starting with (and
* including) the given ID. If no event exists with that ID, the first event

View File

@ -41,6 +41,11 @@ public class MockEventAccess implements EventAccess {
return processGroupStatus;
}
@Override
public ProcessGroupStatus getGroupStatus(final String groupId) {
return null;
}
@Override
public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, int maxRecords) throws IOException {
if (firstEventId < 0 || maxRecords < 1) {
@ -65,6 +70,7 @@ public class MockEventAccess implements EventAccess {
this.provenanceRecords.add(record);
}
@Override
public ProvenanceEventRepository getProvenanceRepository() {
return null;
}

View File

@ -80,6 +80,15 @@ public class AmbariReportingTask extends AbstractReportingTask {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder()
.name("Process Group ID")
.description("If specified, the reporting task will send metrics about this process group only. If"
+ " not, the root process group is used and global metrics are sent.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile Client client;
private volatile JsonBuilderFactory factory;
private volatile VirtualMachineMetrics virtualMachineMetrics;
@ -93,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
properties.add(METRICS_COLLECTOR_URL);
properties.add(APPLICATION_ID);
properties.add(HOSTNAME);
properties.add(PROCESS_GROUP_ID);
return properties;
}
@ -112,12 +122,12 @@ public class AmbariReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
.evaluateAttributeExpressions().getValue();
final String applicationId = context.getProperty(APPLICATION_ID)
.evaluateAttributeExpressions().getValue();
final String hostname = context.getProperty(HOSTNAME)
.evaluateAttributeExpressions().getValue();
final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL).evaluateAttributeExpressions().getValue();
final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final boolean pgIdIsSet = context.getProperty(PROCESS_GROUP_ID).isSet();
final String processGroupId = pgIdIsSet ? context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue() : null;
final long start = System.currentTimeMillis();
@ -140,22 +150,28 @@ public class AmbariReportingTask extends AbstractReportingTask {
}
// calculate the current metrics, but store them to be sent next time
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
final Map<String,String> statusMetrics = metricsService.getMetrics(status);
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
final ProcessGroupStatus status = processGroupId == null ? context.getEventAccess().getControllerStatus() : context.getEventAccess().getGroupStatus(processGroupId);
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
if(status != null) {
final Map<String,String> statusMetrics = metricsService.getMetrics(status, pgIdIsSet);
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
final JsonObject metricsObject = metricsBuilder
.applicationId(applicationId)
.instanceId(status.getId())
.hostname(hostname)
.timestamp(start)
.addAllMetrics(statusMetrics)
.addAllMetrics(jvmMetrics)
.build();
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
previousMetrics = metricsObject;
final JsonObject metricsObject = metricsBuilder
.applicationId(applicationId)
.instanceId(status.getId())
.hostname(hostname)
.timestamp(start)
.addAllMetrics(statusMetrics)
.addAllMetrics(jvmMetrics)
.build();
previousMetrics = metricsObject;
} else {
getLogger().error("No process group status with ID = {}", new Object[]{processGroupId});
previousMetrics = null;
}
}
}

View File

@ -21,6 +21,9 @@ package org.apache.nifi.reporting.ambari.metrics;
*/
public interface MetricNames {
// Metric Name separator
String METRIC_NAME_SEPARATOR = ".";
// NiFi Metrics
String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
String BYTES_RECEIVED = "BytesReceivedLast5Minutes";

View File

@ -33,25 +33,26 @@ public class MetricsService {
* Generates a Map of metrics for a ProcessGroupStatus instance.
*
* @param status a ProcessGroupStatus to get metrics from
* @param appendPgId if true, the process group ID will be appended at the end of the metric name
* @return a map of metrics for the given status
*/
public Map<String,String> getMetrics(ProcessGroupStatus status) {
public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
final Map<String,String> metrics = new HashMap<>();
metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived()));
metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived()));
metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent()));
metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent()));
metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount()));
metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize()));
metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead()));
metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten()));
metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount()));
metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
final long durationNanos = calculateProcessingNanos(status);
metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, String.valueOf(durationNanos));
metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
metrics.put(MetricNames.TOTAL_TASK_DURATION_SECONDS, String.valueOf(durationSeconds));
metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
return metrics;
}
@ -118,4 +119,13 @@ public class MetricsService {
return nanos;
}
// append the process group ID if necessary
private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
if(appendPgId) {
return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
} else {
return name;
}
}
}

View File

@ -110,6 +110,8 @@ public class TestAmbariReportingTask {
.thenReturn(new MockPropertyValue(applicationId));
Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME))
.thenReturn(new MockPropertyValue(hostName));
Mockito.when(context.getProperty(AmbariReportingTask.PROCESS_GROUP_ID))
.thenReturn(new MockPropertyValue("1234"));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
@ -122,6 +124,7 @@ public class TestAmbariReportingTask {
task.setup(configurationContext);
task.onTrigger(context);
}
// override the creation of the client to provide a mock
private class TestableAmbariReportingTask extends AmbariReportingTask {

View File

@ -60,7 +60,7 @@ public class TestMetricsService {
final MetricsService service = new MetricsService();
final Map<String,String> metrics = service.getMetrics(status);
final Map<String,String> metrics = service.getMetrics(status, false);
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
@ -75,6 +75,44 @@ public class TestMetricsService {
Assert.assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION_NANOS));
}
@Test
public void testGetProcessGroupStatusMetricsWithID() {
ProcessGroupStatus status = new ProcessGroupStatus();
String id = "1234";
status.setId(id);
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
groupStatus.setProcessorStatus(processorStatuses);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus);
status.setProcessGroupStatus(groupStatuses);
final MetricsService service = new MetricsService();
final Map<String,String> metrics = service.getMetrics(status, true);
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED + MetricNames.METRIC_NAME_SEPARATOR + id));
}
@Test
public void testGetVirtualMachineMetrics() {
final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();

View File

@ -2246,6 +2246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @param groupId group id
* @return the component status
*/
@Override
public ProcessGroupStatus getGroupStatus(final String groupId) {
return getGroupStatus(groupId, getProcessorStats());
}