NIFI-12506 Added Threading for Status Analytics Retrieval

This closes #8158

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Timea Barna 2023-12-14 07:29:27 +01:00 committed by exceptionfactory
parent e66da895fc
commit 9c871699c3
No known key found for this signature in database
4 changed files with 163 additions and 23 deletions

View File

@ -100,7 +100,6 @@ import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.diagnostics.DiagnosticLevel;
import org.apache.nifi.diagnostics.StorageUsage;
@ -366,6 +365,7 @@ import org.apache.nifi.web.revision.RevisionUpdate;
import org.apache.nifi.web.revision.StandardRevisionClaim;
import org.apache.nifi.web.revision.StandardRevisionUpdate;
import org.apache.nifi.web.revision.UpdateRevisionTask;
import org.apache.nifi.web.util.PredictionBasedParallelProcessingService;
import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -457,6 +457,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private RuleViolationsManager ruleViolationsManager;
private PredictionBasedParallelProcessingService parallelProcessingService;
// -----------------------------------------
// Synchronization methods
// -----------------------------------------
@ -6232,28 +6234,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
PrometheusMetricsUtil.createAggregatedNifiMetrics(nifiMetricsRegistry, aggregatedMetrics, instanceId,ROOT_PROCESS_GROUP, rootPGName, rootPGId);
// Get Connection Status Analytics (predictions, e.g.)
Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
for (Connection c : connections) {
// If a ResourceNotFoundException is thrown, analytics hasn't been enabled
try {
final StatusAnalytics statusAnalytics = controllerFacade.getConnectionStatusAnalytics(c.getIdentifier());
PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
statusAnalytics,
instanceId,
"Connection",
c.getName(),
c.getIdentifier(),
c.getProcessGroup().getIdentifier(),
c.getSource().getName(),
c.getSource().getIdentifier(),
c.getDestination().getName(),
c.getDestination().getIdentifier()
);
PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, statusAnalytics.getPredictions());
} catch (ResourceNotFoundException rnfe) {
break;
}
}
Collection<Map<String, Long>> predictions = parallelProcessingService.createConnectionStatusAnalyticsMetricsAndCollectPredictions(
controllerFacade, connectionAnalyticsMetricsRegistry, instanceId);
predictions.forEach((prediction) -> PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, prediction));
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, aggregatedMetrics, instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId);
// Create a query to get all bulletins
@ -6764,4 +6748,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public void setRuleViolationsManager(RuleViolationsManager ruleViolationsManager) {
this.ruleViolationsManager = ruleViolationsManager;
}
public void setParallelProcessingService(PredictionBasedParallelProcessingService parallelProcessingService) {
this.parallelProcessingService = parallelProcessingService;
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.web.controller.ControllerFacade;
import java.util.Collection;
import java.util.Map;
public interface PredictionBasedParallelProcessingService {
/**
* @return a connection prediction collection
*
* @param controllerFacade controller facade
* @param connectionAnalyticsMetricsRegistry connection analytics metrics registry
* @param instanceId instance id of the flow controller
*/
Collection<Map<String, Long>> createConnectionStatusAnalyticsMetricsAndCollectPredictions(
ControllerFacade controllerFacade, ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, String instanceId);
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;
import jakarta.ws.rs.WebApplicationException;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.controller.ControllerFacade;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class VirtualThreadParallelProcessingService implements PredictionBasedParallelProcessingService, Closeable {
private boolean analyticsEnabled;
private ExecutorService parallelProcessingExecutorService;
private long parallelProcessingTimeout;
public VirtualThreadParallelProcessingService(final NiFiProperties properties) {
// We need to make processing timeout shorter than the web request timeout as if they overlap Jetty may throw IllegalStateException
parallelProcessingTimeout = Math.round(FormatUtils.getPreciseTimeDuration(
properties.getProperty(NiFiProperties.WEB_REQUEST_TIMEOUT, "1 min"), TimeUnit.MILLISECONDS)) - 5000;
analyticsEnabled = Boolean.parseBoolean(
properties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED, Boolean.FALSE.toString()));
if (analyticsEnabled) {
this.parallelProcessingExecutorService = Executors.newVirtualThreadPerTaskExecutor();
}
}
@Override
public Collection<Map<String, Long>> createConnectionStatusAnalyticsMetricsAndCollectPredictions(
ControllerFacade controllerFacade, ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, String instanceId) {
Collection<Map<String, Long>> predictions = Collections.synchronizedList(new ArrayList<>());
if (!analyticsEnabled) {
return predictions;
}
final Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
final CountDownLatch countDownLatch = new CountDownLatch(connections.size());
try {
for (Connection c : connections) {
parallelProcessingExecutorService.execute(() -> {
try {
final StatusAnalytics statusAnalytics = controllerFacade.getConnectionStatusAnalytics(c.getIdentifier());
PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
statusAnalytics,
instanceId,
"Connection",
c.getName(),
c.getIdentifier(),
c.getProcessGroup().getIdentifier(),
c.getSource().getName(),
c.getSource().getIdentifier(),
c.getDestination().getName(),
c.getDestination().getIdentifier()
);
predictions.add(statusAnalytics.getPredictions());
} finally {
countDownLatch.countDown();
}
});
}
} finally {
try {
boolean finished = countDownLatch.await(parallelProcessingTimeout, TimeUnit.MILLISECONDS);
if (!finished) {
throw new WebApplicationException("Populating flow metrics timed out");
}
} catch (InterruptedException e) {
throw new WebApplicationException("Populating flow metrics cancelled");
}
}
return predictions;
}
@Override
public void close() throws IOException {
if (parallelProcessingExecutorService != null) {
parallelProcessingExecutorService.close();
}
}
}

View File

@ -363,6 +363,7 @@
<property name="flowRegistryDAO" ref="flowRegistryDAO" />
<property name="parameterContextDAO" ref="parameterContextDAO" />
<property name="ruleViolationsManager" ref="ruleViolationsManager" />
<property name="parallelProcessingService" ref="parallelProcessingService" />
</bean>
<!-- component ui extension configuration context -->
@ -776,4 +777,8 @@
<!-- NiFi locking -->
<bean id="serviceFacadeLock" class="org.apache.nifi.web.NiFiServiceFacadeLock"/>
<bean id="parallelProcessingService" class="org.apache.nifi.web.util.VirtualThreadParallelProcessingService">
<constructor-arg ref="nifiProperties" />
</bean>
</beans>