NIFI-8536: PrometheusReportingTask and PrometheusRecordSink do not support variable registry for port property

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5075
This commit is contained in:
Mohammed Nadeem 2021-05-13 05:16:51 +05:30 committed by Matthew Burgess
parent 97ed0efbc2
commit 73a2283c27
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 10 additions and 6 deletions

View File

@ -93,7 +93,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
public void onScheduled(final ConfigurationContext context) { public void onScheduled(final ConfigurationContext context) {
RECORD_REGISTRY.clear(); RECORD_REGISTRY.clear();
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue(); final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).evaluateAttributeExpressions().getValue();
try { try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>(); List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
@ -119,7 +119,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
metricsCollectors.add(nifiMetrics); metricsCollectors.add(nifiMetrics);
prometheusServer.setMetricsCollectors(metricsCollectors); prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started JETTY server"); getLogger().info("Started Jetty server");
} catch (Exception e) { } catch (Exception e) {
// Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started // Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
throw new ProcessException("Failed to start Jetty server", e); throw new ProcessException("Failed to start Jetty server", e);

View File

@ -109,7 +109,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
@OnScheduled @OnScheduled
public void onScheduled(final ConfigurationContext context) { public void onScheduled(final ConfigurationContext context) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue(); final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).evaluateAttributeExpressions().getValue();
try { try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>(); List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
@ -165,7 +165,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
metricsCollectors.add(jvmMetrics); metricsCollectors.add(jvmMetrics);
} }
this.prometheusServer.setMetricsCollectors(metricsCollectors); this.prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started JETTY server"); getLogger().info("Started Jetty server");
} catch (Exception e) { } catch (Exception e) {
// Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started // Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
throw new ProcessException("Failed to start Jetty server", e); throw new ProcessException("Failed to start Jetty server", e);

View File

@ -31,6 +31,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil; import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.WriteResult;
@ -44,6 +45,7 @@ import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockControllerServiceInitializationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.MockVariableRegistry;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -149,9 +151,11 @@ public class TestPrometheusRecordSink {
final PrometheusRecordSink task = new PrometheusRecordSink(); final PrometheusRecordSink task = new PrometheusRecordSink();
ConfigurationContext context = mock(ConfigurationContext.class); ConfigurationContext context = mock(ConfigurationContext.class);
final StateManager stateManager = new MockStateManager(task); final StateManager stateManager = new MockStateManager(task);
final MockVariableRegistry variableRegistry = new MockVariableRegistry();
final PropertyValue pValue = mock(StandardPropertyValue.class); final PropertyValue pValue = mock(StandardPropertyValue.class);
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue(portString));
variableRegistry.setVariable(new VariableDescriptor("port"), portString);
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue("${port}", null, variableRegistry));
when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue); when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null); when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);