diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 01dd887190..2ddee1d959 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -300,9 +300,6 @@ public class NiFiProperties extends ApplicationProperties { public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name"; public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold"; - // flow analysis properties - public static final String BACKGROUND_FLOW_ANALYSIS_SCHEDULE = "nifi.flow.analysis.background.task.schedule"; - // runtime monitoring properties public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = "nifi.monitor.long.running.task.schedule"; public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = "nifi.monitor.long.running.task.threshold"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java index c9e7c79e2b..ef7c313763 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java @@ -32,9 +32,22 @@ public class FlowAnalysisResultEntity extends Entity { public FlowAnalysisResultEntity() { } + private boolean flowAnalysisPending; + private List rules = new ArrayList<>(); private List ruleViolations = new ArrayList<>(); + /** + * @return true if a flow analysis is going to be scheduled due to flow changes, false otherwise + */ + public boolean isFlowAnalysisPending() { + return flowAnalysisPending; + } + + public void setFlowAnalysisPending(boolean flowAnalysisPending) { + this.flowAnalysisPending = flowAnalysisPending; + } + /** * @return set of flow analysis rules that are being serialized */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 8721273373..c7775c99a8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3479,6 +3479,10 @@ public final class StandardProcessGroup implements ProcessGroup { } versionControlFields.setFlowDifferences(null); + + flowManager.getFlowAnalyzer().ifPresent( + flowManager -> flowManager.setFlowAnalysisRequired(true) + ); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java index 1c6be53371..743b8156a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java @@ -24,6 +24,18 @@ import org.apache.nifi.flow.VersionedProcessGroup; * Analyzes components, parts or the entirety of the flow. */ public interface FlowAnalyzer { + + /** + * Returns whether flow analysis should be scheduled + * @return true if flow analysis should be scheduled, false otherwise + */ + boolean isFlowAnalysisRequired(); + + /** + * Sets whether flow analysis should be scheduled + */ + void setFlowAnalysisRequired(boolean flowAnalysisRequired); + /** * Analyzes a processor * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 727f71f25f..d3eb8644cc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1281,13 +1281,11 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr private void scheduleBackgroundFlowAnalysis(Supplier rootProcessGroupSupplier) { if (flowAnalyzer != null) { try { - final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.BACKGROUND_FLOW_ANALYSIS_SCHEDULE); - flowAnalysisThreadPool.scheduleWithFixedDelay( new TriggerFlowAnalysisTask(flowAnalyzer, rootProcessGroupSupplier), - scheduleMillis, - scheduleMillis, - TimeUnit.MILLISECONDS + 5, + 5, + TimeUnit.SECONDS ); } catch (Exception e) { LOG.warn("Could not initialize TriggerFlowAnalysisTask.", e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java index eba4c81b05..1b8ca8e566 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java @@ -59,6 +59,8 @@ public class StandardFlowAnalyzer implements FlowAnalyzer { private ControllerServiceProvider controllerServiceProvider; + private volatile boolean flowAnalysisRequired; + public StandardFlowAnalyzer( final RuleViolationsManager ruleViolationsManager, final FlowAnalysisRuleProvider flowAnalysisRuleProvider, @@ -73,6 +75,16 @@ public class StandardFlowAnalyzer implements FlowAnalyzer { this.controllerServiceProvider = controllerServiceProvider; } + @Override + public boolean isFlowAnalysisRequired() { + return flowAnalysisRequired; + } + + @Override + public void setFlowAnalysisRequired(boolean flowAnalysisRequired) { + this.flowAnalysisRequired = flowAnalysisRequired; + } + @Override public void analyzeProcessor(ProcessorNode processorNode) { logger.debug("Running analysis on {}", processorNode); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java index 56f86c3ade..97fd94ade5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java @@ -36,12 +36,16 @@ public class TriggerFlowAnalysisTask implements Runnable { @Override public void run() { - try { + if (flowAnalyzer.isFlowAnalysisRequired()) { logger.debug("Triggering analysis of entire flow"); - - flowAnalyzer.analyzeProcessGroup(rootProcessGroupSupplier.get()); - } catch (final Throwable t) { - logger.error("Encountered unexpected error when attempting to analyze flow", t); + try { + flowAnalyzer.analyzeProcessGroup(rootProcessGroupSupplier.get()); + flowAnalyzer.setFlowAnalysisRequired(false); + } catch (final Throwable t) { + logger.error("Encountered unexpected error when attempting to analyze flow", t); + } + } else { + logger.trace("Flow hasn't changed, flow analysis is put on hold"); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 63e7308f24..49fc414acb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -253,9 +253,6 @@ rSquared .90 - - 5 mins - 1 min 5 mins diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 3147152535..f021c71a2f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -337,9 +337,6 @@ nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.mode # kubernetes # nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix} -# flow analysis properties -nifi.flow.analysis.background.task.schedule=${nifi.flow.analysis.background.task.schedule} - # runtime monitoring properties nifi.monitor.long.running.task.schedule= nifi.monitor.long.running.task.threshold= diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 71b1e92322..282f2a8486 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -2789,14 +2789,6 @@ public interface NiFiServiceFacade { */ FlowAnalysisRuleEntity deleteFlowAnalysisRule(Revision revision, String flowAnalysisRuleId); - /** - * Analyze the flow or a part of it - * - * @param processGroupId The id of the process group representing (a part of) the flow to be analyzed. - * Recursive - all child process groups will be analyzed as well. - */ - void analyzeProcessGroup(String processGroupId); - /** * @return all current rule violations */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 96ec627f61..d2c0f1cd26 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -84,7 +84,6 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.VerifiableControllerService; import org.apache.nifi.controller.flow.FlowManager; -import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.FlowFileEvent; @@ -6411,22 +6410,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return flowAnalysisRules; } - @Override - public void analyzeProcessGroup(String processGroupId) { - ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); - - NiFiRegistryFlowMapper mapper = FlowAnalysisUtil.createMapper(controllerFacade.getExtensionManager()); - - InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mapper.mapNonVersionedProcessGroup( - processGroup, - controllerFacade.getControllerServiceProvider() - ); - - controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent( - flowAnalyzer -> flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup) - ); - } - @Override public FlowAnalysisResultEntity getFlowAnalysisResult() { Collection ruleViolations = ruleViolationsManager.getAllRuleViolations(); @@ -6463,6 +6446,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public FlowAnalysisResultEntity createFlowAnalysisResultEntity(Collection ruleViolations) { FlowAnalysisResultEntity entity = new FlowAnalysisResultEntity(); + controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent( + flowAnalyzer -> entity.setFlowAnalysisPending(flowAnalyzer.isFlowAnalysisRequired()) + ); + List flowAnalysisRuleDtos = flowAnalysisRuleDAO.getFlowAnalysisRules().stream() .filter(FlowAnalysisRuleNode::isEnabled) .sorted(Comparator.comparing(FlowAnalysisRuleNode::getName)) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 765a536f39..a8e993645d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -24,8 +24,6 @@ import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntr import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,7 +41,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; -import jakarta.servlet.http.HttpServletRequest; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; @@ -55,7 +52,6 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; @@ -93,13 +89,7 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.concurrent.AsyncRequestManager; -import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest; import org.apache.nifi.web.api.concurrent.RequestManager; -import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest; -import org.apache.nifi.web.api.concurrent.StandardUpdateStep; -import org.apache.nifi.web.api.concurrent.UpdateStep; -import org.apache.nifi.web.api.dto.AnalyzeFlowRequestDTO; -import org.apache.nifi.web.api.dto.AnalyzeFlowRequestUpdateStepDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; @@ -114,7 +104,6 @@ import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; -import org.apache.nifi.web.api.entity.AnalyzeFlowRequestEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; @@ -3000,256 +2989,6 @@ public class ProcessGroupResource extends FlowUpdateResource { - final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); - processGroup.getAuthorizable().authorize(authorizer, RequestAction.READ, user); - }, - null, - (processGroupEntity) -> { - String analyzedGroupId = processGroupEntity.getId(); - - final String requestId = generateUuid(); - final AsynchronousWebRequest analyzeFlowAsyncWebRequest = new StandardAsynchronousWebRequest<>( - requestId, - analyzedGroupId, - analyzedGroupId, - user, - Collections.singletonList(new StandardUpdateStep("Analyze Process Group")) - ); - - // Submit the request to be performed in the background - final Consumer> analyzeFlowTask = asyncRequest -> { - try { - serviceFacade.analyzeProcessGroup(analyzedGroupId); - asyncRequest.markStepComplete(); - } catch (final Exception e) { - logger.error("Failed to run flow analysis on process group {}", processGroupId, e); - asyncRequest.fail("Failed to run flow analysis on process group " + processGroupId + " due to " + e); - } - }; - flowAnalysisAsyncRequestManager.submitRequest( - FLOW_ANALYSIS_REQUEST_TYPE, - requestId, - analyzeFlowAsyncWebRequest, - analyzeFlowTask - ); - - return generateOkResponse(createAnalyzeFlowRequestEntity(analyzeFlowAsyncWebRequest, requestId)).build(); - } - ); - } - - /** - * Checks the status of an outstanding request for a flow analysis. - * - * @param requestId The id of flow analysis request - * @return An analyzeFlowRequestEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces(MediaType.APPLICATION_JSON) - @Path("{id}/flow-analysis-requests/{requestId}") - @Operation( - summary = "Gets the current status of a flow analysis request.", - responses = @ApiResponse(content = @Content(schema = @Schema(implementation = AnalyzeFlowRequestEntity.class))), - security = { - @SecurityRequirement(name = "Read - /process-groups/{uuid} - For this and all encapsulated process groups") - } - ) - @ApiResponses( - value = { - @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), - @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), - @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), - @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") - } - ) - public Response getAnalyzeFlowRequest( - @Parameter( - description = "The id of the process group representing (a part of) the flow being analyzed.", - required = true - ) - @PathParam("id") final String processGroupId, - @Parameter( - description = "The id of the process group representing (a part of) the flow to be analyzed.", - required = true - ) - @PathParam("requestId") final String requestId - ) { - if (isReplicateRequest()) { - return replicate(HttpMethod.GET); - } - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request manager will ensure that the current is the user that submitted this request - final AsynchronousWebRequest asyncRequest = - flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user); - - return generateOkResponse(createAnalyzeFlowRequestEntity(asyncRequest, requestId)).build(); - } - - /** - * Cancels the specified flow analysis request. - * - * @param httpServletRequest request - * @param requestId The id of the flow analysis request - * @return An analyzeFlowRequestEntity - */ - @DELETE - @Consumes(MediaType.WILDCARD) - @Produces(MediaType.APPLICATION_JSON) - @Path("{id}/flow-analysis-requests/{requestId}") - @Operation( - summary = "Cancels a flow analysis request.", - responses = @ApiResponse(content = @Content(schema = @Schema(implementation = AnalyzeFlowRequestEntity.class))), - security = { - @SecurityRequirement(name = "Read - /process-groups/{uuid} - For this and all encapsulated process groups") - } - ) - @ApiResponses( - value = { - @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), - @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), - @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), - @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") - } - ) - public Response removeAnalyzeFlowRequest( - @Parameter( - description = "The id of the process group representing (a part of) the flow being analyzed.", - required = true - ) - @PathParam("id") final String processGroupId, - @Context final HttpServletRequest httpServletRequest, - @Parameter( - description = "The id of the flow analysis request", - required = true - ) - @PathParam("requestId") final String requestId - ) { - if (isReplicateRequest()) { - return replicate(HttpMethod.DELETE); - } - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest); - final boolean executionPhase = isExecutionPhase(httpServletRequest); - - // If this is a standalone node, or if this is the execution phase of the request, perform the actual request. - if (!twoPhaseRequest || executionPhase) { - // request manager will ensure that the current is the user that submitted this request - final AsynchronousWebRequest asyncRequest = - flowAnalysisAsyncRequestManager.removeRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user); - - if (asyncRequest == null) { - throw new ResourceNotFoundException("Could not find request of type " + FLOW_ANALYSIS_REQUEST_TYPE + " with ID " + requestId); - } - - if (!asyncRequest.isComplete()) { - asyncRequest.cancel(); - } - - AnalyzeFlowRequestEntity analyzeFlowRequestEntity = createAnalyzeFlowRequestEntity(asyncRequest, requestId); - return generateOkResponse(analyzeFlowRequestEntity).build(); - } - - if (isValidationPhase(httpServletRequest)) { - // Perform authorization by attempting to get the request - flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user); - return generateContinueResponse().build(); - } else if (isCancellationPhase(httpServletRequest)) { - return generateOkResponse().build(); - } else { - throw new IllegalStateException("This request does not appear to be part of the two phase commit."); - } - } - - private AnalyzeFlowRequestEntity createAnalyzeFlowRequestEntity( - final AsynchronousWebRequest asyncRequest, - final String requestId - ) { - String analyzedGroupId = asyncRequest.getRequest(); - - AnalyzeFlowRequestDTO responseDto = new AnalyzeFlowRequestDTO(); - responseDto.setProcessGroupId(analyzedGroupId); - - responseDto.setRequestId(requestId); - responseDto.setComplete(asyncRequest.isComplete()); - responseDto.setFailureReason(asyncRequest.getFailureReason()); - responseDto.setLastUpdated(asyncRequest.getLastUpdated()); - responseDto.setPercentCompleted(asyncRequest.getPercentComplete()); - responseDto.setState(asyncRequest.getState()); - responseDto.setUri(generateResourceUri("process-groups", "flow-analysis", analyzedGroupId)); - - final List updateSteps = new ArrayList<>(); - for (final UpdateStep updateStep : asyncRequest.getUpdateSteps()) { - final AnalyzeFlowRequestUpdateStepDTO updateStepDTO = new AnalyzeFlowRequestUpdateStepDTO(); - updateStepDTO.setDescription(updateStep.getDescription()); - updateStepDTO.setComplete(updateStep.isComplete()); - updateStepDTO.setFailureReason(updateStep.getFailureReason()); - updateSteps.add(updateStepDTO); - } - responseDto.setUpdateSteps(updateSteps); - - AnalyzeFlowRequestEntity analyzeFlowRequestEntity = new AnalyzeFlowRequestEntity(); - analyzeFlowRequestEntity.setAnalyzeFlowRequest(responseDto); - - return analyzeFlowRequestEntity; - } - //-- - /** * Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java index 0d3365f1b2..50ad24d165 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java @@ -100,6 +100,10 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna // perform the update configureFlowAnalysisRule(flowAnalysisRule, flowAnalysisRuleDTO); + flowController.getFlowManager().getFlowAnalyzer().ifPresent( + flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true) + ); + return flowAnalysisRule; } catch (FlowAnalysisRuleInstantiationException rtie) { throw new NiFiCoreException(rtie.getMessage(), rtie); @@ -163,6 +167,10 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna } } + flowController.getFlowManager().getFlowAnalyzer().ifPresent( + flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true) + ); + return flowAnalysisRule; } @@ -326,6 +334,10 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna public void deleteFlowAnalysisRule(String flowAnalysisRuleId) { final FlowAnalysisRuleNode flowAnalysisRule = locateFlowAnalysisRule(flowAnalysisRuleId); flowAnalysisRuleProvider.removeFlowAnalysisRule(flowAnalysisRule); + + flowController.getFlowManager().getFlowAnalyzer().ifPresent( + flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true) + ); } @Override diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties index f9ef29179d..d914e8e643 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties @@ -253,5 +253,3 @@ nifi.kerberos.service.keytab.location= nifi.kerberos.spnego.principal= nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.authentication.expiration=12 hours - -nifi.flow.analysis.background.task.schedule=5 mins diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties index 8b45f32b17..5bb68875e7 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties @@ -253,5 +253,3 @@ nifi.kerberos.service.keytab.location= nifi.kerberos.spnego.principal= nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.authentication.expiration=12 hours - -nifi.flow.analysis.background.task.schedule=5 mins diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties index 4758d714dc..b2f1a05240 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties @@ -254,5 +254,3 @@ nifi.kerberos.service.keytab.location= nifi.kerberos.spnego.principal= nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.authentication.expiration=12 hours - -nifi.flow.analysis.background.task.schedule=5 mins diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties index faf92c6a96..d0efbde51d 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties @@ -262,5 +262,3 @@ nifi.kerberos.spnego.authentication.expiration=12 hours # external properties files for variable registry # supports a comma delimited list of file locations nifi.variable.registry.properties= - -nifi.flow.analysis.background.task.schedule=5 mins