mirror of https://github.com/apache/nifi.git
NIFI-9587 Added JSON format for Prometheus Flow Metrics
This closes #5673 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
d987351031
commit
ccd47de6dc
|
@ -111,6 +111,7 @@ import org.apache.nifi.web.api.entity.VersionedFlowEntity;
|
|||
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
|
||||
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
|
||||
import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
|
||||
import org.apache.nifi.web.api.metrics.JsonFormatPrometheusMetricsWriter;
|
||||
import org.apache.nifi.web.api.metrics.TextFormatPrometheusMetricsWriter;
|
||||
import org.apache.nifi.web.api.metrics.PrometheusMetricsWriter;
|
||||
import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
|
||||
|
@ -428,7 +429,11 @@ public class FlowResource extends ApplicationResource {
|
|||
@ApiParam(
|
||||
value = "Regular Expression Pattern to be applied against the sample label value field"
|
||||
)
|
||||
@QueryParam("sampleLabelValue") final String sampleLabelValue
|
||||
@QueryParam("sampleLabelValue") final String sampleLabelValue,
|
||||
@ApiParam(
|
||||
value = "Name of the first field of JSON object. Applicable for JSON producer only."
|
||||
)
|
||||
@QueryParam("rootFieldName") final String rootFieldName
|
||||
) {
|
||||
|
||||
authorizeFlow();
|
||||
|
@ -442,6 +447,16 @@ public class FlowResource extends ApplicationResource {
|
|||
prometheusMetricsWriter.write(registries, outputStream);
|
||||
});
|
||||
return generateOkResponse(response).type(TextFormat.CONTENT_TYPE_004).build();
|
||||
|
||||
} else if (FlowMetricsProducer.JSON.getProducer().equals(producer)) {
|
||||
final StreamingOutput output = outputStream -> {
|
||||
final JsonFormatPrometheusMetricsWriter jsonPrometheusMetricsWriter = new JsonFormatPrometheusMetricsWriter(sampleName, sampleLabelValue, rootFieldName);
|
||||
jsonPrometheusMetricsWriter.write(registries, outputStream);
|
||||
};
|
||||
return generateOkResponse(output)
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.build();
|
||||
|
||||
} else {
|
||||
throw new ResourceNotFoundException("The specified producer is missing or invalid.");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.web.api.metrics;
|
||||
|
||||
import io.prometheus.client.Collector;
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import java.util.Enumeration;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public abstract class AbstractPrometheusMetricsWriter implements PrometheusMetricsWriter {
|
||||
private final Pattern sampleNamePattern;
|
||||
|
||||
private final Pattern sampleLabelValuePattern;
|
||||
|
||||
private final boolean filteringDisabled;
|
||||
|
||||
public AbstractPrometheusMetricsWriter(
|
||||
final String sampleName,
|
||||
final String sampleLabelValue
|
||||
) {
|
||||
this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
|
||||
this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
|
||||
this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
|
||||
}
|
||||
|
||||
Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
|
||||
final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
|
||||
return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
|
||||
samples,
|
||||
sampleNamePattern,
|
||||
sampleLabelValuePattern
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.web.api.metrics;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.prometheus.client.Collector;
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.util.Collection;
|
||||
import java.util.Enumeration;
|
||||
|
||||
/**
|
||||
* Prometheus Metrics Writer with Json output format and optional filtering
|
||||
*/
|
||||
public class JsonFormatPrometheusMetricsWriter extends AbstractPrometheusMetricsWriter {
|
||||
private final String rootFieldName;
|
||||
|
||||
public JsonFormatPrometheusMetricsWriter(final String sampleName, final String sampleLabelValue, final String rootFieldName) {
|
||||
super(sampleName, sampleLabelValue);
|
||||
this.rootFieldName = rootFieldName == null ? "samples" : rootFieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final Collection<CollectorRegistry> registries, final OutputStream outputStream) throws IOException {
|
||||
JsonFactory factory = new JsonFactory();
|
||||
try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));
|
||||
final JsonGenerator generator = factory.createGenerator(writer)) {
|
||||
generator.setCodec(new ObjectMapper());
|
||||
generator.writeStartObject();
|
||||
generator.writeFieldName(rootFieldName);
|
||||
generator.writeStartArray();
|
||||
for (final CollectorRegistry collectorRegistry : registries) {
|
||||
final Enumeration<Collector.MetricFamilySamples> samples = getSamples(collectorRegistry);
|
||||
while (samples.hasMoreElements()) {
|
||||
final Collector.MetricFamilySamples samples2 = samples.nextElement();
|
||||
for (Collector.MetricFamilySamples.Sample sample : samples2.samples) {
|
||||
generator.writeObject(sample);
|
||||
generator.flush();
|
||||
}
|
||||
generator.flush();
|
||||
}
|
||||
}
|
||||
generator.writeEndArray();
|
||||
generator.writeEndObject();
|
||||
generator.flush();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.web.api.metrics;
|
|||
import io.prometheus.client.Collector;
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import io.prometheus.client.exporter.common.TextFormat;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
|
@ -28,25 +27,14 @@ import java.io.OutputStreamWriter;
|
|||
import java.io.Writer;
|
||||
import java.util.Collection;
|
||||
import java.util.Enumeration;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
|
||||
*/
|
||||
public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
|
||||
private final Pattern sampleNamePattern;
|
||||
public class TextFormatPrometheusMetricsWriter extends AbstractPrometheusMetricsWriter {
|
||||
|
||||
private final Pattern sampleLabelValuePattern;
|
||||
|
||||
private final boolean filteringDisabled;
|
||||
|
||||
public TextFormatPrometheusMetricsWriter(
|
||||
final String sampleName,
|
||||
final String sampleLabelValue
|
||||
) {
|
||||
this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
|
||||
this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
|
||||
this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
|
||||
public TextFormatPrometheusMetricsWriter(final String sampleName, final String sampleLabelValue) {
|
||||
super(sampleName, sampleLabelValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,13 +47,4 @@ public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWrite
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
|
||||
final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
|
||||
return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
|
||||
samples,
|
||||
sampleNamePattern,
|
||||
sampleLabelValuePattern
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,8 @@ package org.apache.nifi.web.api.request;
|
|||
* Flow Metrics Producers supported
|
||||
*/
|
||||
public enum FlowMetricsProducer {
|
||||
PROMETHEUS("prometheus");
|
||||
PROMETHEUS("prometheus"),
|
||||
JSON("json");
|
||||
|
||||
private final String producer;
|
||||
|
||||
|
|
|
@ -16,10 +16,21 @@
|
|||
*/
|
||||
package org.apache.nifi.web.api;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import io.prometheus.client.exporter.common.TextFormat;
|
||||
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
|
||||
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
|
||||
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
|
||||
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
|
||||
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.web.NiFiServiceFacade;
|
||||
import org.apache.nifi.web.ResourceNotFoundException;
|
||||
|
@ -36,10 +47,18 @@ import javax.ws.rs.core.StreamingOutput;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
@ -51,20 +70,19 @@ import static org.mockito.Mockito.when;
|
|||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestFlowResource {
|
||||
private static final String LABEL_VALUE = TestFlowResource.class.getSimpleName();
|
||||
|
||||
private static final String OTHER_LABEL_VALUE = JmxJvmMetrics.class.getSimpleName();
|
||||
|
||||
private static final String THREAD_COUNT_NAME = "nifi_jvm_thread_count";
|
||||
|
||||
private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
|
||||
|
||||
private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
|
||||
|
||||
private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
|
||||
|
||||
private static final String THREAD_COUNT_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
|
||||
|
||||
private static final String THREAD_COUNT_OTHER_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
|
||||
private static final String ROOT_FIELD_NAME = "beans";
|
||||
private static final String SAMPLE_NAME_JVM = ".*jvm.*";
|
||||
private static final String SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP = "RootProcessGroup";
|
||||
private static final String SAMPLE_LABEL_VALUES_PROCESS_GROUP = "ProcessGroup";
|
||||
private static final String COMPONENT_TYPE_LABEL = "component_type";
|
||||
private static final int COMPONENT_TYPE_VALUE_INDEX = 1;
|
||||
|
||||
@InjectMocks
|
||||
private FlowResource resource = new FlowResource();
|
||||
|
@ -74,7 +92,7 @@ public class TestFlowResource {
|
|||
|
||||
@Test
|
||||
public void testGetFlowMetricsProducerInvalid() {
|
||||
assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null));
|
||||
assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -82,7 +100,7 @@ public class TestFlowResource {
|
|||
final List<CollectorRegistry> registries = getCollectorRegistries();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null);
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null, null);
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
|
||||
|
@ -98,7 +116,7 @@ public class TestFlowResource {
|
|||
final List<CollectorRegistry> registries = getCollectorRegistries();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null);
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null, null);
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
|
||||
|
@ -114,7 +132,7 @@ public class TestFlowResource {
|
|||
final List<CollectorRegistry> registries = getCollectorRegistries();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null);
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, null);
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
|
||||
|
@ -131,7 +149,7 @@ public class TestFlowResource {
|
|||
final List<CollectorRegistry> registries = getCollectorRegistries();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE);
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE, null);
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
|
||||
|
@ -147,7 +165,7 @@ public class TestFlowResource {
|
|||
final List<CollectorRegistry> registries = getCollectorRegistries();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE);
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE, null);
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
|
||||
|
@ -160,6 +178,110 @@ public class TestFlowResource {
|
|||
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowMetricsPrometheusAsJson() throws IOException {
|
||||
final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), null, null, ROOT_FIELD_NAME);
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getMediaType());
|
||||
|
||||
final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
|
||||
assertThat(metrics.keySet(), hasSize(1));
|
||||
assertThat(metrics, hasKey(ROOT_FIELD_NAME));
|
||||
|
||||
final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
|
||||
assertThat(registryList, hasSize(9));
|
||||
|
||||
final Map<String, Long> result = getResult(registryList);
|
||||
assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
|
||||
assertThat(4L, equalTo(result.get(SAMPLE_LABEL_VALUES_PROCESS_GROUP)));
|
||||
assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowMetricsPrometheusAsJsonSampleName() throws IOException {
|
||||
final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), SAMPLE_NAME_JVM, null, ROOT_FIELD_NAME);
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
|
||||
|
||||
final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
|
||||
assertThat(metrics.keySet(), hasSize(1));
|
||||
assertThat(metrics, hasKey(ROOT_FIELD_NAME));
|
||||
|
||||
final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
|
||||
assertThat(registryList, hasSize(3));
|
||||
|
||||
final Map<String, Long> result = getResult(registryList);
|
||||
assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowMetricsPrometheusAsJsonSampleNameStartsWithPattern() throws IOException {
|
||||
final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, ROOT_FIELD_NAME);
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
|
||||
|
||||
final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
|
||||
assertThat(metrics.keySet(), hasSize(1));
|
||||
assertThat(metrics, hasKey(ROOT_FIELD_NAME));
|
||||
|
||||
final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
|
||||
assertThat(registryList, hasSize(2));
|
||||
|
||||
final Map<String, Long> result = getResult(registryList);
|
||||
assertThat(2L, equalTo(result.get(SAMPLE_NAME_JVM)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowMetricsPrometheusAsJsonSampleLabelValue() throws IOException {
|
||||
final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), null, SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, ROOT_FIELD_NAME);
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
|
||||
|
||||
final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
|
||||
assertThat(metrics.keySet(), hasSize(1));
|
||||
assertThat(metrics, hasKey(ROOT_FIELD_NAME));
|
||||
|
||||
final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
|
||||
assertThat(registryList, hasSize(2));
|
||||
|
||||
final Map<String, Long> result = getResult(registryList);
|
||||
assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowMetricsPrometheusAsJsonSampleNameAndSampleLabelValue() throws IOException {
|
||||
final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
|
||||
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
|
||||
|
||||
final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), SAMPLE_NAME_JVM, SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, ROOT_FIELD_NAME);
|
||||
assertNotNull(response);
|
||||
assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
|
||||
|
||||
final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
|
||||
assertThat(metrics.keySet(), hasSize(1));
|
||||
assertThat(metrics, hasKey(ROOT_FIELD_NAME));
|
||||
|
||||
final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
|
||||
assertThat(registryList, hasSize(5));
|
||||
|
||||
final Map<String, Long> result = getResult(registryList);
|
||||
assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
|
||||
assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
|
||||
}
|
||||
|
||||
private String getResponseOutput(final Response response) throws IOException {
|
||||
final StreamingOutput streamingOutput = (StreamingOutput) response.getEntity();
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
|
@ -174,4 +296,105 @@ public class TestFlowResource {
|
|||
final CollectorRegistry otherJvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
|
||||
return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
|
||||
}
|
||||
|
||||
private Map<String, List<Sample>> convertJsonResponseToMap(final Response response) throws IOException {
|
||||
final TypeReference<HashMap<String, List<Sample>>> typeReference = new TypeReference<HashMap<String, List<Sample>>>() {};
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
final SimpleModule module = new SimpleModule();
|
||||
|
||||
module.addDeserializer(Sample.class, new SampleDeserializer());
|
||||
mapper.registerModule(module);
|
||||
|
||||
final String json = getResponseOutput(response);
|
||||
return mapper.readValue(json, typeReference);
|
||||
}
|
||||
|
||||
private Map<String, Long> getResult(final List<Sample> registries) {
|
||||
return registries.stream()
|
||||
.collect(Collectors.groupingBy(
|
||||
sample -> getResultKey(sample),
|
||||
Collectors.counting()));
|
||||
}
|
||||
|
||||
private String getResultKey(final Sample sample) {
|
||||
return sample.labelNames.contains(COMPONENT_TYPE_LABEL) ? sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX) : SAMPLE_NAME_JVM;
|
||||
}
|
||||
|
||||
private static List<CollectorRegistry> getCollectorRegistriesForJson() {
|
||||
final List<CollectorRegistry> registryList = new ArrayList<>();
|
||||
|
||||
registryList.add(getNifiMetricsRegistry());
|
||||
registryList.add(getConnectionMetricsRegistry());
|
||||
registryList.add(getJvmMetricsRegistry());
|
||||
registryList.add(getBulletinMetricsRegistry());
|
||||
|
||||
return registryList;
|
||||
|
||||
}
|
||||
|
||||
private static CollectorRegistry getNifiMetricsRegistry() {
|
||||
final NiFiMetricsRegistry registry = new NiFiMetricsRegistry();
|
||||
|
||||
registry.setDataPoint(136, "TOTAL_BYTES_READ",
|
||||
"RootPGId", SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, "rootPGName", "", "");
|
||||
registry.setDataPoint(136, "TOTAL_BYTES_READ",
|
||||
"PGId", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGName", "RootPGId", "");
|
||||
|
||||
return registry.getRegistry();
|
||||
}
|
||||
|
||||
private static CollectorRegistry getConnectionMetricsRegistry() {
|
||||
final ConnectionAnalyticsMetricsRegistry connectionMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
|
||||
|
||||
connectionMetricsRegistry.setDataPoint(1.0,
|
||||
"TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
|
||||
"PGId", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "success", "connComponentId",
|
||||
"RootPGId", "sourceId", "sourceName", "destinationId", "destinationName");
|
||||
connectionMetricsRegistry.setDataPoint(1.0,
|
||||
"TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
|
||||
"RootPGId", SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, "rootPGName", "", "", "", "", "", "");
|
||||
|
||||
return connectionMetricsRegistry.getRegistry();
|
||||
}
|
||||
|
||||
private static CollectorRegistry getJvmMetricsRegistry() {
|
||||
final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
|
||||
|
||||
jvmMetricsRegistry.setDataPoint(4.0, "JVM_HEAP_USED", "instanceId");
|
||||
jvmMetricsRegistry.setDataPoint(6.0, "JVM_HEAP_USAGE", "instanceId");
|
||||
jvmMetricsRegistry.setDataPoint(10.0, "JVM_THREAD_COUNT", "instanceId");
|
||||
|
||||
return jvmMetricsRegistry.getRegistry();
|
||||
}
|
||||
|
||||
private static CollectorRegistry getBulletinMetricsRegistry() {
|
||||
final BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
|
||||
|
||||
bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", "B1Id", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGId", "RootPGId",
|
||||
"nodeAddress", "category", "sourceName", "sourceId", "level");
|
||||
bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", "B2Id", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGId", "RootPGId",
|
||||
"nodeAddress", "category", "sourceName", "sourceId", "level");
|
||||
|
||||
return bulletinMetricsRegistry.getRegistry();
|
||||
}
|
||||
|
||||
private static class SampleDeserializer extends StdDeserializer<Sample> {
|
||||
protected SampleDeserializer() {
|
||||
super(Sample.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sample deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
|
||||
final JsonNode node = jsonParser.getCodec().readTree(jsonParser);
|
||||
|
||||
final String name = node.get("name").asText();
|
||||
final List<String> labelNames = new ArrayList<>();
|
||||
node.get("labelNames").elements().forEachRemaining(e -> labelNames.add(e.asText()));
|
||||
final List<String> labelValues = new ArrayList<>();
|
||||
node.get("labelValues").elements().forEachRemaining(e -> labelValues.add(e.asText()));
|
||||
final double value = node.get("value").asDouble();
|
||||
|
||||
return new Sample(name, labelNames, labelValues, value);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue