NIFI-9435 Added registries and names include parameters to Flow Metrics

- Added optional includedRegistries query parameter to Flow Metrics Resource method supporting one or more registries
- Added optional includedNames query parameter to Flow Metrics Resource method supporting one or more metric family names
- Added sampleName and sampleLabelValue optional pattern parameters
- Added FilteringMetricFamilySamplesEnumeration to support streamed filtering
- Added PrometheusMetricsWriter and TextFormat implementation

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5571.
This commit is contained in:
exceptionfactory 2021-12-03 15:30:11 -06:00 committed by Joe Gresock
parent 7c50f1429e
commit dd7131b257
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
9 changed files with 623 additions and 148 deletions

View File

@ -130,6 +130,7 @@ import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import java.util.Collection;
import java.util.Date;
@ -323,6 +324,14 @@ public interface NiFiServiceFacade {
*/
Collection<CollectorRegistry> generateFlowMetrics();
/**
* Generate metrics for the flow and return selected registries
*
* @param includeRegistries Set of Flow Metrics Registries to be returned
* @return Collector Registries
*/
Collection<CollectorRegistry> generateFlowMetrics(Set<FlowMetricsRegistry> includeRegistries);
/**
* Updates the configuration for this controller.
*

View File

@ -120,6 +120,7 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
@ -308,6 +309,7 @@ import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
@ -418,12 +420,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private AuthorizableLookup authorizableLookup;
// Prometheus Metrics objects
private NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
private JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
private ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
private BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
private final NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
private final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
private final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
private final BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
public final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(
private final Collection<AbstractMetricsRegistry> configuredRegistries = Arrays.asList(
nifiMetricsRegistry,
jvmMetricsRegistry,
connectionAnalyticsMetricsRegistry,
bulletinMetricsRegistry
);
private final Collection<CollectorRegistry> metricsRegistries = Arrays.asList(
nifiMetricsRegistry.getRegistry(),
jvmMetricsRegistry.getRegistry(),
connectionAnalyticsMetricsRegistry.getRegistry(),
@ -5654,7 +5663,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
);
}
}
return ALL_REGISTRIES;
return metricsRegistries;
}
@Override
public Collection<CollectorRegistry> generateFlowMetrics(final Set<FlowMetricsRegistry> includeRegistries) {
final Set<FlowMetricsRegistry> selectedRegistries = includeRegistries.isEmpty() ? new HashSet<>(Arrays.asList(FlowMetricsRegistry.values())) : includeRegistries;
final Set<Class<? extends AbstractMetricsRegistry>> registryClasses = selectedRegistries.stream()
.map(FlowMetricsRegistry::getRegistryClass)
.collect(Collectors.toSet());
generateFlowMetrics();
return configuredRegistries.stream()
.filter(configuredRegistry -> registryClasses.contains(configuredRegistry.getClass()))
.map(AbstractMetricsRegistry::getRegistry)
.collect(Collectors.toList());
}
@Override

View File

@ -111,8 +111,12 @@ import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
import org.apache.nifi.web.api.metrics.TextFormatPrometheusMetricsWriter;
import org.apache.nifi.web.api.metrics.PrometheusMetricsWriter;
import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
import org.apache.nifi.web.api.request.DateTimeParameter;
import org.apache.nifi.web.api.request.FlowMetricsProducer;
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
@ -131,13 +135,10 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@ -190,9 +191,8 @@ public class FlowResource extends ApplicationResource {
* Populates the remaining fields in the specified process group.
*
* @param flow group
* @return group dto
*/
private ProcessGroupFlowDTO populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
private void populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
FlowDTO flowStructure = flow.getFlow();
// populate the remaining fields for the processors, connections, process group refs, remote process groups, and labels if appropriate
@ -202,14 +202,12 @@ public class FlowResource extends ApplicationResource {
// set the process group uri
flow.setUri(generateResourceUri("flow", "process-groups", flow.getId()));
return flow;
}
/**
* Populates the remaining content of the specified snippet.
*/
private FlowDTO populateRemainingFlowStructure(FlowDTO flowStructure) {
private void populateRemainingFlowStructure(FlowDTO flowStructure) {
processorResource.populateRemainingProcessorEntitiesContent(flowStructure.getProcessors());
connectionResource.populateRemainingConnectionEntitiesContent(flowStructure.getConnections());
inputPortResource.populateRemainingInputPortEntitiesContent(flowStructure.getInputPorts());
@ -226,8 +224,6 @@ public class FlowResource extends ApplicationResource {
processGroup.setContents(null);
}
}
return flowStructure;
}
/**
@ -371,8 +367,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getFlow(
@ApiParam(
value = "The process group id.",
required = false
value = "The process group id."
)
@PathParam("id") final String groupId,
@QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly) {
@ -393,7 +388,6 @@ public class FlowResource extends ApplicationResource {
* Retrieves the metrics of the entire flow.
*
* @return A flowMetricsEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@ -418,30 +412,36 @@ public class FlowResource extends ApplicationResource {
public Response getFlowMetrics(
@ApiParam(
value = "The producer for flow file metrics. Each producer may have its own output format.",
required = true
required = true,
allowableValues = "prometheus"
)
@PathParam("producer") final String producer) throws InterruptedException {
@PathParam("producer") final String producer,
@ApiParam(
value = "Set of included metrics registries",
allowableValues = "NIFI,JVM,BULLETIN,CONNECTION"
)
@QueryParam("includedRegistries") final Set<FlowMetricsRegistry> includedRegistries,
@ApiParam(
value = "Regular Expression Pattern to be applied against the sample name field"
)
@QueryParam("sampleName") final String sampleName,
@ApiParam(
value = "Regular Expression Pattern to be applied against the sample label value field"
)
@QueryParam("sampleLabelValue") final String sampleLabelValue
) {
authorizeFlow();
if ("prometheus".equalsIgnoreCase(producer)) {
// get this process group flow
final Collection<CollectorRegistry> allRegistries = serviceFacade.generateFlowMetrics();
// generate a streaming response
final StreamingOutput response = output -> {
Writer writer = new BufferedWriter(new OutputStreamWriter(output));
for (CollectorRegistry collectorRegistry : allRegistries) {
TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
// flush the response
output.flush();
}
writer.flush();
writer.close();
};
final Set<FlowMetricsRegistry> selectedRegistries = includedRegistries == null ? Collections.emptySet() : includedRegistries;
final Collection<CollectorRegistry> registries = serviceFacade.generateFlowMetrics(selectedRegistries);
return generateOkResponse(response)
.type(MediaType.TEXT_PLAIN_TYPE)
.build();
if (FlowMetricsProducer.PROMETHEUS.getProducer().equalsIgnoreCase(producer)) {
final StreamingOutput response = (outputStream -> {
final PrometheusMetricsWriter prometheusMetricsWriter = new TextFormatPrometheusMetricsWriter(sampleName, sampleLabelValue);
prometheusMetricsWriter.write(registries, outputStream);
});
return generateOkResponse(response).type(TextFormat.CONTENT_TYPE_004).build();
} else {
throw new ResourceNotFoundException("The specified producer is missing or invalid.");
}
@ -500,7 +500,6 @@ public class FlowResource extends ApplicationResource {
* Retrieves all the of controller services in this NiFi.
*
* @return A controllerServicesEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@ -525,7 +524,7 @@ public class FlowResource extends ApplicationResource {
@ApiParam(value = "The process group id.", required = true) @PathParam("id") String groupId,
@ApiParam("Whether or not to include parent/ancestory process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups,
@ApiParam("Whether or not to include descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups
) throws InterruptedException {
) {
authorizeFlow();
@ -705,25 +704,19 @@ public class FlowResource extends ApplicationResource {
group.findAllProcessors().stream()
.filter(getProcessorFilter.get())
.filter(processor -> OperationAuthorizable.isOperationAuthorized(processor, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(processor -> {
componentIds.add(processor.getIdentifier());
});
.forEach(processor -> componentIds.add(processor.getIdentifier()));
// ensure authorized for each input port we will attempt to schedule
group.findAllInputPorts().stream()
.filter(getPortFilter.get())
.filter(inputPort -> OperationAuthorizable.isOperationAuthorized(inputPort, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(inputPort -> {
componentIds.add(inputPort.getIdentifier());
});
.forEach(inputPort -> componentIds.add(inputPort.getIdentifier()));
// ensure authorized for each output port we will attempt to schedule
group.findAllOutputPorts().stream()
.filter(getPortFilter.get())
.filter(outputPort -> OperationAuthorizable.isOperationAuthorized(outputPort, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(outputPort -> {
componentIds.add(outputPort.getIdentifier());
});
.forEach(outputPort -> componentIds.add(outputPort.getIdentifier()));
return componentIds;
});
@ -936,7 +929,6 @@ public class FlowResource extends ApplicationResource {
*
* @param value Search string
* @return A searchResultsEntity
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@ -961,7 +953,7 @@ public class FlowResource extends ApplicationResource {
public Response searchFlow(
@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value,
@QueryParam("a") @DefaultValue(StringUtils.EMPTY) String activeGroupId
) throws InterruptedException {
) {
authorizeFlow();
// query the controller
@ -1022,7 +1014,6 @@ public class FlowResource extends ApplicationResource {
* Retrieves the cluster summary for this NiFi.
*
* @return A clusterSummaryEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@ -1043,7 +1034,7 @@ public class FlowResource extends ApplicationResource {
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getClusterSummary() throws InterruptedException {
public Response getClusterSummary() {
authorizeFlow();
@ -1187,18 +1178,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getProcessorTypes(
@ApiParam(
value = "If specified, will only return types that are a member of this bundle group.",
required = false
value = "If specified, will only return types that are a member of this bundle group."
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle artifact.",
required = false
value = "If specified, will only return types that are a member of this bundle artifact."
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
value = "If specified, will only return types whose fully qualified classname matches.",
required = false
value = "If specified, will only return types whose fully qualified classname matches."
)
@QueryParam("type") String typeFilter) throws InterruptedException {
@ -1245,38 +1233,31 @@ public class FlowResource extends ApplicationResource {
)
public Response getControllerServiceTypes(
@ApiParam(
value = "If specified, will only return controller services that are compatible with this type of service.",
required = false
value = "If specified, will only return controller services that are compatible with this type of service."
)
@QueryParam("serviceType") String serviceType,
@ApiParam(
value = "If serviceType specified, is the bundle group of the serviceType.",
required = false
value = "If serviceType specified, is the bundle group of the serviceType."
)
@QueryParam("serviceBundleGroup") String serviceBundleGroup,
@ApiParam(
value = "If serviceType specified, is the bundle artifact of the serviceType.",
required = false
value = "If serviceType specified, is the bundle artifact of the serviceType."
)
@QueryParam("serviceBundleArtifact") String serviceBundleArtifact,
@ApiParam(
value = "If serviceType specified, is the bundle version of the serviceType.",
required = false
value = "If serviceType specified, is the bundle version of the serviceType."
)
@QueryParam("serviceBundleVersion") String serviceBundleVersion,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle group.",
required = false
value = "If specified, will only return types that are a member of this bundle group."
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle artifact.",
required = false
value = "If specified, will only return types that are a member of this bundle artifact."
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
value = "If specified, will only return types whose fully qualified classname matches.",
required = false
value = "If specified, will only return types whose fully qualified classname matches."
)
@QueryParam("typeFilter") String typeFilter) throws InterruptedException {
@ -1329,18 +1310,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getReportingTaskTypes(
@ApiParam(
value = "If specified, will only return types that are a member of this bundle group.",
required = false
value = "If specified, will only return types that are a member of this bundle group."
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle artifact.",
required = false
value = "If specified, will only return types that are a member of this bundle artifact."
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
value = "If specified, will only return types whose fully qualified classname matches.",
required = false
value = "If specified, will only return types whose fully qualified classname matches."
)
@QueryParam("type") String typeFilter) throws InterruptedException {
@ -1520,12 +1498,7 @@ public class FlowResource extends ApplicationResource {
}
private SortedSet<BucketEntity> sortBuckets(final Set<BucketEntity> buckets) {
final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>(new Comparator<BucketEntity>() {
@Override
public int compare(final BucketEntity entity1, final BucketEntity entity2) {
return Collator.getInstance().compare(getBucketName(entity1), getBucketName(entity2));
}
});
final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>((entity1, entity2) -> Collator.getInstance().compare(getBucketName(entity1), getBucketName(entity2)));
sortedBuckets.addAll(buckets);
return sortedBuckets;
@ -1573,12 +1546,7 @@ public class FlowResource extends ApplicationResource {
}
private SortedSet<VersionedFlowEntity> sortFlows(final Set<VersionedFlowEntity> versionedFlows) {
final SortedSet<VersionedFlowEntity> sortedFlows = new TreeSet<>(new Comparator<VersionedFlowEntity>() {
@Override
public int compare(final VersionedFlowEntity entity1, final VersionedFlowEntity entity2) {
return Collator.getInstance().compare(getFlowName(entity1), getFlowName(entity2));
}
});
final SortedSet<VersionedFlowEntity> sortedFlows = new TreeSet<>((entity1, entity2) -> Collator.getInstance().compare(getFlowName(entity1), getFlowName(entity2)));
sortedFlows.addAll(versionedFlows);
return sortedFlows;
@ -1670,33 +1638,27 @@ public class FlowResource extends ApplicationResource {
)
public Response getBulletinBoard(
@ApiParam(
value = "Includes bulletins with an id after this value.",
required = false
value = "Includes bulletins with an id after this value."
)
@QueryParam("after") LongParameter after,
@ApiParam(
value = "Includes bulletins originating from this sources whose name match this regular expression.",
required = false
value = "Includes bulletins originating from this sources whose name match this regular expression."
)
@QueryParam("sourceName") BulletinBoardPatternParameter sourceName,
@ApiParam(
value = "Includes bulletins whose message that match this regular expression.",
required = false
value = "Includes bulletins whose message that match this regular expression."
)
@QueryParam("message") BulletinBoardPatternParameter message,
@ApiParam(
value = "Includes bulletins originating from this sources whose id match this regular expression.",
required = false
value = "Includes bulletins originating from this sources whose id match this regular expression."
)
@QueryParam("sourceId") BulletinBoardPatternParameter sourceId,
@ApiParam(
value = "Includes bulletins originating from this sources whose group id match this regular expression.",
required = false
value = "Includes bulletins originating from this sources whose group id match this regular expression."
)
@QueryParam("groupId") BulletinBoardPatternParameter groupId,
@ApiParam(
value = "The number of bulletins to limit the response to.",
required = false
value = "The number of bulletins to limit the response to."
)
@QueryParam("limit") IntegerParameter limit) throws InterruptedException {
@ -1773,13 +1735,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getProcessorStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@ -1846,13 +1806,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getInputPortStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@ -1919,13 +1877,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getOutputPortStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@ -1992,18 +1948,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getRemoteProcessGroupStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The remote process group id.",
required = true
value = "The remote process group id."
)
@PathParam("id") String id) throws InterruptedException {
@ -2068,18 +2021,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getProcessGroupStatus(
@ApiParam(
value = "Whether all descendant groups and the status of their content will be included. Optional, defaults to false",
required = false
value = "Whether all descendant groups and the status of their content will be included. Optional, defaults to false"
)
@QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@ -2146,13 +2096,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getConnectionStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@ -2219,13 +2167,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getConnectionStatistics(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
value = "Whether or not to include the breakdown per node. Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the statistics.",
required = false
value = "The id of the node where to get the statistics."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@ -2550,33 +2496,27 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam("count") IntegerParameter count,
@ApiParam(
value = "The field to sort on.",
required = false
value = "The field to sort on."
)
@QueryParam("sortColumn") String sortColumn,
@ApiParam(
value = "The direction to sort.",
required = false
value = "The direction to sort."
)
@QueryParam("sortOrder") String sortOrder,
@ApiParam(
value = "Include actions after this date.",
required = false
value = "Include actions after this date."
)
@QueryParam("startDate") DateTimeParameter startDate,
@ApiParam(
value = "Include actions before this date.",
required = false
value = "Include actions before this date."
)
@QueryParam("endDate") DateTimeParameter endDate,
@ApiParam(
value = "Include actions performed by this user.",
required = false
value = "Include actions performed by this user."
)
@QueryParam("userIdentity") String userIdentity,
@ApiParam(
value = "Include actions on this component.",
required = false
value = "Include actions on this component."
)
@QueryParam("sourceId") String sourceId) {

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.metrics;
import io.prometheus.client.Collector;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Enumeration wrapping Prometheus Collector Samples with filtering based on multiple patterns
*/
public class FilteringMetricFamilySamplesEnumeration implements Enumeration<Collector.MetricFamilySamples> {
private final Enumeration<Collector.MetricFamilySamples> metricFamilySamples;
private final Pattern sampleNamePattern;
private final Pattern sampleLabelValuePattern;
private Collector.MetricFamilySamples nextElement;
/**
* Filtering Metric Family Samples Enumeration with required properties
*
* @param metricFamilySamples Metric Family Samples to be filtered
* @param sampleNamePattern Pattern used to match against Sample.name field supports null values
* @param sampleLabelValuePattern Pattern used to matching against Sample.labelValues field supports null values
*/
public FilteringMetricFamilySamplesEnumeration(
final Enumeration<Collector.MetricFamilySamples> metricFamilySamples,
final Pattern sampleNamePattern,
final Pattern sampleLabelValuePattern
) {
this.metricFamilySamples = Objects.requireNonNull(metricFamilySamples);
this.sampleNamePattern = sampleNamePattern;
this.sampleLabelValuePattern = sampleLabelValuePattern;
setNextElement();
}
/**
* Has More Elements based on whether the next element is set from a previous operation
*
* @return More Elements status
*/
@Override
public boolean hasMoreElements() {
return nextElement != null;
}
/**
* Get Next Element and set next available element before returning
*
* @return Next Element based on applied filters
*/
@Override
public Collector.MetricFamilySamples nextElement() {
if (nextElement == null) {
throw new NoSuchElementException();
}
final Collector.MetricFamilySamples currentElement = nextElement;
setNextElement();
return currentElement;
}
/**
* Set Next Element based on Sample having matching properties
*/
private void setNextElement() {
nextElement = null;
while (metricFamilySamples.hasMoreElements()) {
final Collector.MetricFamilySamples possibleNextElement = metricFamilySamples.nextElement();
possibleNextElement.samples.removeIf(this::isSampleNotMatched);
if (possibleNextElement.samples.size() == 0) {
continue;
}
nextElement = possibleNextElement;
break;
}
}
private boolean isSampleNotMatched(final Collector.MetricFamilySamples.Sample sample) {
boolean notMatched = false;
if (sampleNamePattern == null) {
notMatched = isSampleLabelValueNotMatched(sample);
} else if (sampleLabelValuePattern == null) {
notMatched = isSampleNameNotMatched(sample);
} else if (isSampleNameNotMatched(sample) && isSampleLabelValueNotMatched(sample)) {
notMatched = true;
}
return notMatched;
}
private boolean isSampleNameNotMatched(final Collector.MetricFamilySamples.Sample sample) {
final Matcher sampleNameMatcher = sampleNamePattern.matcher(sample.name);
return !sampleNameMatcher.matches();
}
private boolean isSampleLabelValueNotMatched(final Collector.MetricFamilySamples.Sample sample) {
boolean notMatched = true;
for (final String labelValue : sample.labelValues) {
final Matcher sampleLabelValueMatcher = sampleLabelValuePattern.matcher(labelValue);
if (sampleLabelValueMatcher.matches()) {
notMatched = false;
break;
}
}
return notMatched;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.metrics;
import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
/**
* Prometheus Metrics Writer
*/
public interface PrometheusMetricsWriter {
/**
* Write collection of metrics registries to provided stream
*
* @param registries Collector Registries
* @param outputStream Output Stream
* @throws IOException Thrown on failure to write metrics
*/
void write(Collection<CollectorRegistry> registries, OutputStream outputStream) throws IOException;
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.metrics;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.Enumeration;
import java.util.regex.Pattern;
/**
* Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
*/
public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
private final Pattern sampleNamePattern;
private final Pattern sampleLabelValuePattern;
private final boolean filteringDisabled;
public TextFormatPrometheusMetricsWriter(
final String sampleName,
final String sampleLabelValue
) {
this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
}
@Override
public void write(final Collection<CollectorRegistry> registries, final OutputStream outputStream) throws IOException {
try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream))) {
for (final CollectorRegistry collectorRegistry : registries) {
final Enumeration<Collector.MetricFamilySamples> samples = getSamples(collectorRegistry);
TextFormat.write004(writer, samples);
writer.flush();
}
}
}
private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
samples,
sampleNamePattern,
sampleLabelValuePattern
);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.request;
/**
* Flow Metrics Producers supported
*/
public enum FlowMetricsProducer {
PROMETHEUS("prometheus");
private final String producer;
FlowMetricsProducer(final String producer) {
this.producer = producer;
}
public String getProducer() {
return producer;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.request;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
/**
* Flow Metrics Registries
*/
public enum FlowMetricsRegistry {
NIFI("NIFI", NiFiMetricsRegistry.class),
JVM("JVM", JvmMetricsRegistry.class),
BULLETIN("BULLETIN", BulletinMetricsRegistry.class),
CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class);
private final String registry;
private final Class<? extends AbstractMetricsRegistry> registryClass;
FlowMetricsRegistry(final String registry, final Class<? extends AbstractMetricsRegistry> registryClass) {
this.registry = registry;
this.registryClass = registryClass;
}
public String getRegistry() {
return registry;
}
public Class<? extends AbstractMetricsRegistry> getRegistryClass() {
return registryClass;
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.request.FlowMetricsProducer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TestFlowResource {
private static final String LABEL_VALUE = TestFlowResource.class.getSimpleName();
private static final String OTHER_LABEL_VALUE = JmxJvmMetrics.class.getSimpleName();
private static final String THREAD_COUNT_NAME = "nifi_jvm_thread_count";
private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
private static final String THREAD_COUNT_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
private static final String THREAD_COUNT_OTHER_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
@InjectMocks
private FlowResource resource = new FlowResource();
@Mock
private NiFiServiceFacade serviceFacade;
@Test
public void testGetFlowMetricsProducerInvalid() {
assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null));
}
@Test
public void testGetFlowMetricsPrometheus() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
final String output = getResponseOutput(response);
assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not found");
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
}
@Test
public void testGetFlowMetricsPrometheusSampleName() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
final String output = getResponseOutput(response);
assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not found");
assertFalse(output.contains(HEAP_USAGE_NAME), "Heap Usage name not filtered");
}
@Test
public void testGetFlowMetricsPrometheusSampleNameStartsWithPattern() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
final String output = getResponseOutput(response);
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
assertTrue(output.contains(HEAP_USED_NAME), "Heap Used name not found");
assertFalse(output.contains(THREAD_COUNT_NAME), "Heap Usage name not filtered");
}
@Test
public void testGetFlowMetricsPrometheusSampleLabelValue() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
final String output = getResponseOutput(response);
assertTrue(output.contains(LABEL_VALUE), "Label Value not found");
assertFalse(output.contains(OTHER_LABEL_VALUE), "Other Label Value not filtered");
}
@Test
public void testGetFlowMetricsPrometheusSampleNameAndSampleLabelValue() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
final String output = getResponseOutput(response);
assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not found");
assertTrue(output.contains(THREAD_COUNT_LABEL), "Thread Count with label not found");
assertTrue(output.contains(THREAD_COUNT_OTHER_LABEL), "Thread Count with other label not found");
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
}
private String getResponseOutput(final Response response) throws IOException {
final StreamingOutput streamingOutput = (StreamingOutput) response.getEntity();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
streamingOutput.write(outputStream);
final byte[] outputBytes = outputStream.toByteArray();
return new String(outputBytes, StandardCharsets.UTF_8);
}
private List<CollectorRegistry> getCollectorRegistries() {
final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
final CollectorRegistry jvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), LABEL_VALUE);
final CollectorRegistry otherJvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
}
}