NIFI-12924 Flow Analysis no longer done regularly or on-demand, but automatically when a change occurs that could result in the change of violations.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8537.
This commit is contained in:
tpalfy 2024-03-20 19:21:10 +01:00 committed by Pierre Villard
parent e5827c1026
commit c3eff68f92
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
17 changed files with 69 additions and 313 deletions

View File

@ -300,9 +300,6 @@ public class NiFiProperties extends ApplicationProperties {
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name"; public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold"; public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";
// flow analysis properties
public static final String BACKGROUND_FLOW_ANALYSIS_SCHEDULE = "nifi.flow.analysis.background.task.schedule";
// runtime monitoring properties // runtime monitoring properties
public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = "nifi.monitor.long.running.task.schedule"; public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = "nifi.monitor.long.running.task.schedule";
public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = "nifi.monitor.long.running.task.threshold"; public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = "nifi.monitor.long.running.task.threshold";

View File

@ -32,9 +32,22 @@ public class FlowAnalysisResultEntity extends Entity {
public FlowAnalysisResultEntity() { public FlowAnalysisResultEntity() {
} }
private boolean flowAnalysisPending;
private List<FlowAnalysisRuleDTO> rules = new ArrayList<>(); private List<FlowAnalysisRuleDTO> rules = new ArrayList<>();
private List<FlowAnalysisRuleViolationDTO> ruleViolations = new ArrayList<>(); private List<FlowAnalysisRuleViolationDTO> ruleViolations = new ArrayList<>();
/**
* @return true if a flow analysis is going to be scheduled due to flow changes, false otherwise
*/
public boolean isFlowAnalysisPending() {
return flowAnalysisPending;
}
public void setFlowAnalysisPending(boolean flowAnalysisPending) {
this.flowAnalysisPending = flowAnalysisPending;
}
/** /**
* @return set of flow analysis rules that are being serialized * @return set of flow analysis rules that are being serialized
*/ */

View File

@ -3479,6 +3479,10 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
versionControlFields.setFlowDifferences(null); versionControlFields.setFlowDifferences(null);
flowManager.getFlowAnalyzer().ifPresent(
flowManager -> flowManager.setFlowAnalysisRequired(true)
);
} }
@Override @Override

View File

@ -24,6 +24,18 @@ import org.apache.nifi.flow.VersionedProcessGroup;
* Analyzes components, parts or the entirety of the flow. * Analyzes components, parts or the entirety of the flow.
*/ */
public interface FlowAnalyzer { public interface FlowAnalyzer {
/**
* Returns whether flow analysis should be scheduled
* @return true if flow analysis should be scheduled, false otherwise
*/
boolean isFlowAnalysisRequired();
/**
* Sets whether flow analysis should be scheduled
*/
void setFlowAnalysisRequired(boolean flowAnalysisRequired);
/** /**
* Analyzes a processor * Analyzes a processor
* *

View File

@ -1281,13 +1281,11 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
private void scheduleBackgroundFlowAnalysis(Supplier<VersionedProcessGroup> rootProcessGroupSupplier) { private void scheduleBackgroundFlowAnalysis(Supplier<VersionedProcessGroup> rootProcessGroupSupplier) {
if (flowAnalyzer != null) { if (flowAnalyzer != null) {
try { try {
final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.BACKGROUND_FLOW_ANALYSIS_SCHEDULE);
flowAnalysisThreadPool.scheduleWithFixedDelay( flowAnalysisThreadPool.scheduleWithFixedDelay(
new TriggerFlowAnalysisTask(flowAnalyzer, rootProcessGroupSupplier), new TriggerFlowAnalysisTask(flowAnalyzer, rootProcessGroupSupplier),
scheduleMillis, 5,
scheduleMillis, 5,
TimeUnit.MILLISECONDS TimeUnit.SECONDS
); );
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Could not initialize TriggerFlowAnalysisTask.", e); LOG.warn("Could not initialize TriggerFlowAnalysisTask.", e);

View File

@ -59,6 +59,8 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
private ControllerServiceProvider controllerServiceProvider; private ControllerServiceProvider controllerServiceProvider;
private volatile boolean flowAnalysisRequired;
public StandardFlowAnalyzer( public StandardFlowAnalyzer(
final RuleViolationsManager ruleViolationsManager, final RuleViolationsManager ruleViolationsManager,
final FlowAnalysisRuleProvider flowAnalysisRuleProvider, final FlowAnalysisRuleProvider flowAnalysisRuleProvider,
@ -73,6 +75,16 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
this.controllerServiceProvider = controllerServiceProvider; this.controllerServiceProvider = controllerServiceProvider;
} }
@Override
public boolean isFlowAnalysisRequired() {
return flowAnalysisRequired;
}
@Override
public void setFlowAnalysisRequired(boolean flowAnalysisRequired) {
this.flowAnalysisRequired = flowAnalysisRequired;
}
@Override @Override
public void analyzeProcessor(ProcessorNode processorNode) { public void analyzeProcessor(ProcessorNode processorNode) {
logger.debug("Running analysis on {}", processorNode); logger.debug("Running analysis on {}", processorNode);

View File

@ -36,12 +36,16 @@ public class TriggerFlowAnalysisTask implements Runnable {
@Override @Override
public void run() { public void run() {
try { if (flowAnalyzer.isFlowAnalysisRequired()) {
logger.debug("Triggering analysis of entire flow"); logger.debug("Triggering analysis of entire flow");
try {
flowAnalyzer.analyzeProcessGroup(rootProcessGroupSupplier.get()); flowAnalyzer.analyzeProcessGroup(rootProcessGroupSupplier.get());
} catch (final Throwable t) { flowAnalyzer.setFlowAnalysisRequired(false);
logger.error("Encountered unexpected error when attempting to analyze flow", t); } catch (final Throwable t) {
logger.error("Encountered unexpected error when attempting to analyze flow", t);
}
} else {
logger.trace("Flow hasn't changed, flow analysis is put on hold");
} }
} }
} }

View File

@ -253,9 +253,6 @@
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name> <nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold> <nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
<!-- nifi.properties: flow analysis properties -->
<nifi.flow.analysis.background.task.schedule>5 mins</nifi.flow.analysis.background.task.schedule>
<!-- nifi.properties: runtime monitoring properties --> <!-- nifi.properties: runtime monitoring properties -->
<nifi.monitor.long.running.task.schedule>1 min</nifi.monitor.long.running.task.schedule> <nifi.monitor.long.running.task.schedule>1 min</nifi.monitor.long.running.task.schedule>
<nifi.monitor.long.running.task.threshold>5 mins</nifi.monitor.long.running.task.threshold> <nifi.monitor.long.running.task.threshold>5 mins</nifi.monitor.long.running.task.threshold>

View File

@ -337,9 +337,6 @@ nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.mode
# kubernetes # # kubernetes #
nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix} nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix}
# flow analysis properties
nifi.flow.analysis.background.task.schedule=${nifi.flow.analysis.background.task.schedule}
# runtime monitoring properties # runtime monitoring properties
nifi.monitor.long.running.task.schedule= nifi.monitor.long.running.task.schedule=
nifi.monitor.long.running.task.threshold= nifi.monitor.long.running.task.threshold=

View File

@ -2789,14 +2789,6 @@ public interface NiFiServiceFacade {
*/ */
FlowAnalysisRuleEntity deleteFlowAnalysisRule(Revision revision, String flowAnalysisRuleId); FlowAnalysisRuleEntity deleteFlowAnalysisRule(Revision revision, String flowAnalysisRuleId);
/**
* Analyze the flow or a part of it
*
* @param processGroupId The id of the process group representing (a part of) the flow to be analyzed.
* Recursive - all child process groups will be analyzed as well.
*/
void analyzeProcessGroup(String processGroupId);
/** /**
* @return all current rule violations * @return all current rule violations
*/ */

View File

@ -84,7 +84,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.VerifiableControllerService; import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEvent; import org.apache.nifi.controller.repository.FlowFileEvent;
@ -6411,22 +6410,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return flowAnalysisRules; return flowAnalysisRules;
} }
@Override
public void analyzeProcessGroup(String processGroupId) {
ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
NiFiRegistryFlowMapper mapper = FlowAnalysisUtil.createMapper(controllerFacade.getExtensionManager());
InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mapper.mapNonVersionedProcessGroup(
processGroup,
controllerFacade.getControllerServiceProvider()
);
controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent(
flowAnalyzer -> flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup)
);
}
@Override @Override
public FlowAnalysisResultEntity getFlowAnalysisResult() { public FlowAnalysisResultEntity getFlowAnalysisResult() {
Collection<RuleViolation> ruleViolations = ruleViolationsManager.getAllRuleViolations(); Collection<RuleViolation> ruleViolations = ruleViolationsManager.getAllRuleViolations();
@ -6463,6 +6446,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public FlowAnalysisResultEntity createFlowAnalysisResultEntity(Collection<RuleViolation> ruleViolations) { public FlowAnalysisResultEntity createFlowAnalysisResultEntity(Collection<RuleViolation> ruleViolations) {
FlowAnalysisResultEntity entity = new FlowAnalysisResultEntity(); FlowAnalysisResultEntity entity = new FlowAnalysisResultEntity();
controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent(
flowAnalyzer -> entity.setFlowAnalysisPending(flowAnalyzer.isFlowAnalysisRequired())
);
List<FlowAnalysisRuleDTO> flowAnalysisRuleDtos = flowAnalysisRuleDAO.getFlowAnalysisRules().stream() List<FlowAnalysisRuleDTO> flowAnalysisRuleDtos = flowAnalysisRuleDAO.getFlowAnalysisRules().stream()
.filter(FlowAnalysisRuleNode::isEnabled) .filter(FlowAnalysisRuleNode::isEnabled)
.sorted(Comparator.comparing(FlowAnalysisRuleNode::getName)) .sorted(Comparator.comparing(FlowAnalysisRuleNode::getName))

View File

@ -24,8 +24,6 @@ import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntr
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -43,7 +41,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.Consumes; import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.DefaultValue;
@ -55,7 +52,6 @@ import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam; import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces; import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response;
@ -93,13 +89,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision; import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.concurrent.AsyncRequestManager; import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.RequestManager; import org.apache.nifi.web.api.concurrent.RequestManager;
import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.StandardUpdateStep;
import org.apache.nifi.web.api.concurrent.UpdateStep;
import org.apache.nifi.web.api.dto.AnalyzeFlowRequestDTO;
import org.apache.nifi.web.api.dto.AnalyzeFlowRequestUpdateStepDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.DropRequestDTO;
@ -114,7 +104,6 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.AnalyzeFlowRequestEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity;
@ -3000,256 +2989,6 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue()); return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue());
} }
// -------------
// flow-analysis
// -------------
/**
* Submits a request to run a flow analysis.
*
* @param processGroupId The id of the process group representing (a part of) the flow to be analyzed
* @return An AnalyzeFlowRequestEntity
*/
@POST
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/flow-analysis-requests")
@Operation(
summary = "Executes a flow analysis for components within a given process group",
responses = @ApiResponse(content = @Content(schema = @Schema(implementation = AnalyzeFlowRequestEntity.class))),
security = {
@SecurityRequirement(name = "Read - /process-groups/{uuid} - For this and all encapsulated process groups")
}
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
}
)
public Response submitAnalyzeFlowRequest(
@Parameter(
description = "The id of the process group representing (a part of) the flow to be analyzed.",
required = true
)
@PathParam("id") final String processGroupId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.POST);
}
NiFiUser user = NiFiUserUtils.getNiFiUser();
ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity();
requestProcessGroupEntity.setId(processGroupId);
return withWriteLock(
serviceFacade,
requestProcessGroupEntity,
lookup -> {
final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
processGroup.getAuthorizable().authorize(authorizer, RequestAction.READ, user);
},
null,
(processGroupEntity) -> {
String analyzedGroupId = processGroupEntity.getId();
final String requestId = generateUuid();
final AsynchronousWebRequest<String, Void> analyzeFlowAsyncWebRequest = new StandardAsynchronousWebRequest<>(
requestId,
analyzedGroupId,
analyzedGroupId,
user,
Collections.singletonList(new StandardUpdateStep("Analyze Process Group"))
);
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<String, Void>> analyzeFlowTask = asyncRequest -> {
try {
serviceFacade.analyzeProcessGroup(analyzedGroupId);
asyncRequest.markStepComplete();
} catch (final Exception e) {
logger.error("Failed to run flow analysis on process group {}", processGroupId, e);
asyncRequest.fail("Failed to run flow analysis on process group " + processGroupId + " due to " + e);
}
};
flowAnalysisAsyncRequestManager.submitRequest(
FLOW_ANALYSIS_REQUEST_TYPE,
requestId,
analyzeFlowAsyncWebRequest,
analyzeFlowTask
);
return generateOkResponse(createAnalyzeFlowRequestEntity(analyzeFlowAsyncWebRequest, requestId)).build();
}
);
}
/**
* Checks the status of an outstanding request for a flow analysis.
*
* @param requestId The id of flow analysis request
* @return An analyzeFlowRequestEntity
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/flow-analysis-requests/{requestId}")
@Operation(
summary = "Gets the current status of a flow analysis request.",
responses = @ApiResponse(content = @Content(schema = @Schema(implementation = AnalyzeFlowRequestEntity.class))),
security = {
@SecurityRequirement(name = "Read - /process-groups/{uuid} - For this and all encapsulated process groups")
}
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
}
)
public Response getAnalyzeFlowRequest(
@Parameter(
description = "The id of the process group representing (a part of) the flow being analyzed.",
required = true
)
@PathParam("id") final String processGroupId,
@Parameter(
description = "The id of the process group representing (a part of) the flow to be analyzed.",
required = true
)
@PathParam("requestId") final String requestId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// request manager will ensure that the current is the user that submitted this request
final AsynchronousWebRequest<String, Void> asyncRequest =
flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user);
return generateOkResponse(createAnalyzeFlowRequestEntity(asyncRequest, requestId)).build();
}
/**
* Cancels the specified flow analysis request.
*
* @param httpServletRequest request
* @param requestId The id of the flow analysis request
* @return An analyzeFlowRequestEntity
*/
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/flow-analysis-requests/{requestId}")
@Operation(
summary = "Cancels a flow analysis request.",
responses = @ApiResponse(content = @Content(schema = @Schema(implementation = AnalyzeFlowRequestEntity.class))),
security = {
@SecurityRequirement(name = "Read - /process-groups/{uuid} - For this and all encapsulated process groups")
}
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
}
)
public Response removeAnalyzeFlowRequest(
@Parameter(
description = "The id of the process group representing (a part of) the flow being analyzed.",
required = true
)
@PathParam("id") final String processGroupId,
@Context final HttpServletRequest httpServletRequest,
@Parameter(
description = "The id of the flow analysis request",
required = true
)
@PathParam("requestId") final String requestId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE);
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
final boolean executionPhase = isExecutionPhase(httpServletRequest);
// If this is a standalone node, or if this is the execution phase of the request, perform the actual request.
if (!twoPhaseRequest || executionPhase) {
// request manager will ensure that the current is the user that submitted this request
final AsynchronousWebRequest<String, Void> asyncRequest =
flowAnalysisAsyncRequestManager.removeRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user);
if (asyncRequest == null) {
throw new ResourceNotFoundException("Could not find request of type " + FLOW_ANALYSIS_REQUEST_TYPE + " with ID " + requestId);
}
if (!asyncRequest.isComplete()) {
asyncRequest.cancel();
}
AnalyzeFlowRequestEntity analyzeFlowRequestEntity = createAnalyzeFlowRequestEntity(asyncRequest, requestId);
return generateOkResponse(analyzeFlowRequestEntity).build();
}
if (isValidationPhase(httpServletRequest)) {
// Perform authorization by attempting to get the request
flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user);
return generateContinueResponse().build();
} else if (isCancellationPhase(httpServletRequest)) {
return generateOkResponse().build();
} else {
throw new IllegalStateException("This request does not appear to be part of the two phase commit.");
}
}
private AnalyzeFlowRequestEntity createAnalyzeFlowRequestEntity(
final AsynchronousWebRequest<String, Void> asyncRequest,
final String requestId
) {
String analyzedGroupId = asyncRequest.getRequest();
AnalyzeFlowRequestDTO responseDto = new AnalyzeFlowRequestDTO();
responseDto.setProcessGroupId(analyzedGroupId);
responseDto.setRequestId(requestId);
responseDto.setComplete(asyncRequest.isComplete());
responseDto.setFailureReason(asyncRequest.getFailureReason());
responseDto.setLastUpdated(asyncRequest.getLastUpdated());
responseDto.setPercentCompleted(asyncRequest.getPercentComplete());
responseDto.setState(asyncRequest.getState());
responseDto.setUri(generateResourceUri("process-groups", "flow-analysis", analyzedGroupId));
final List<AnalyzeFlowRequestUpdateStepDTO> updateSteps = new ArrayList<>();
for (final UpdateStep updateStep : asyncRequest.getUpdateSteps()) {
final AnalyzeFlowRequestUpdateStepDTO updateStepDTO = new AnalyzeFlowRequestUpdateStepDTO();
updateStepDTO.setDescription(updateStep.getDescription());
updateStepDTO.setComplete(updateStep.isComplete());
updateStepDTO.setFailureReason(updateStep.getFailureReason());
updateSteps.add(updateStepDTO);
}
responseDto.setUpdateSteps(updateSteps);
AnalyzeFlowRequestEntity analyzeFlowRequestEntity = new AnalyzeFlowRequestEntity();
analyzeFlowRequestEntity.setAnalyzeFlowRequest(responseDto);
return analyzeFlowRequestEntity;
}
//--
/** /**
* Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates. * Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates.
*/ */

View File

@ -100,6 +100,10 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna
// perform the update // perform the update
configureFlowAnalysisRule(flowAnalysisRule, flowAnalysisRuleDTO); configureFlowAnalysisRule(flowAnalysisRule, flowAnalysisRuleDTO);
flowController.getFlowManager().getFlowAnalyzer().ifPresent(
flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true)
);
return flowAnalysisRule; return flowAnalysisRule;
} catch (FlowAnalysisRuleInstantiationException rtie) { } catch (FlowAnalysisRuleInstantiationException rtie) {
throw new NiFiCoreException(rtie.getMessage(), rtie); throw new NiFiCoreException(rtie.getMessage(), rtie);
@ -163,6 +167,10 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna
} }
} }
flowController.getFlowManager().getFlowAnalyzer().ifPresent(
flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true)
);
return flowAnalysisRule; return flowAnalysisRule;
} }
@ -326,6 +334,10 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna
public void deleteFlowAnalysisRule(String flowAnalysisRuleId) { public void deleteFlowAnalysisRule(String flowAnalysisRuleId) {
final FlowAnalysisRuleNode flowAnalysisRule = locateFlowAnalysisRule(flowAnalysisRuleId); final FlowAnalysisRuleNode flowAnalysisRule = locateFlowAnalysisRule(flowAnalysisRuleId);
flowAnalysisRuleProvider.removeFlowAnalysisRule(flowAnalysisRule); flowAnalysisRuleProvider.removeFlowAnalysisRule(flowAnalysisRule);
flowController.getFlowManager().getFlowAnalyzer().ifPresent(
flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true)
);
} }
@Override @Override

View File

@ -253,5 +253,3 @@ nifi.kerberos.service.keytab.location=
nifi.kerberos.spnego.principal= nifi.kerberos.spnego.principal=
nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.keytab.location=
nifi.kerberos.spnego.authentication.expiration=12 hours nifi.kerberos.spnego.authentication.expiration=12 hours
nifi.flow.analysis.background.task.schedule=5 mins

View File

@ -253,5 +253,3 @@ nifi.kerberos.service.keytab.location=
nifi.kerberos.spnego.principal= nifi.kerberos.spnego.principal=
nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.keytab.location=
nifi.kerberos.spnego.authentication.expiration=12 hours nifi.kerberos.spnego.authentication.expiration=12 hours
nifi.flow.analysis.background.task.schedule=5 mins

View File

@ -254,5 +254,3 @@ nifi.kerberos.service.keytab.location=
nifi.kerberos.spnego.principal= nifi.kerberos.spnego.principal=
nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.keytab.location=
nifi.kerberos.spnego.authentication.expiration=12 hours nifi.kerberos.spnego.authentication.expiration=12 hours
nifi.flow.analysis.background.task.schedule=5 mins

View File

@ -262,5 +262,3 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry # external properties files for variable registry
# supports a comma delimited list of file locations # supports a comma delimited list of file locations
nifi.variable.registry.properties= nifi.variable.registry.properties=
nifi.flow.analysis.background.task.schedule=5 mins