diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java index 5c8fd6a9bf..20fcad42b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java @@ -865,6 +865,7 @@ public class ParameterContextResource extends ApplicationResource { final ParameterContextEntity updatedEntity; try { updatedEntity = performParameterContextUpdate(asyncRequest, uri, replicateRequest, revision, updatedContextEntity); + asyncRequest.markStepComplete(); logger.info("Successfully updated Parameter Context with ID {}", updatedContextEntity.getId()); } finally { // TODO: can almost certainly be refactored so that the same code is shared between VersionsResource and ParameterContextResource. @@ -968,8 +969,6 @@ public class ParameterContextResource extends ApplicationResource { logger.info("Restarting {} Processors after having updated Parameter Context", processors.size()); } - asyncRequest.markStepComplete(); - // Step 14. Restart all components final Set componentsToStart = getUpdatedEntities(processors); @@ -978,6 +977,7 @@ public class ParameterContextResource extends ApplicationResource { try { componentLifecycle.scheduleComponents(uri, "root", componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP); + asyncRequest.markStepComplete(); } catch (final IllegalStateException ise) { // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. @@ -1003,8 +1003,6 @@ public class ParameterContextResource extends ApplicationResource { logger.info("Re-Enabling {} Controller Services after having updated Parameter Context", controllerServices.size()); } - asyncRequest.markStepComplete(); - // Step 13. Re-enable all disabled controller services final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); asyncRequest.setCancelCallback(enableServicesPause::cancel); @@ -1012,6 +1010,7 @@ public class ParameterContextResource extends ApplicationResource { try { componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP); + asyncRequest.markStepComplete(); } catch (final IllegalStateException ise) { // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide // a more intelligent error message as to exactly what happened, rather than indicate that the Parameter Context could not be updated. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java index 934e9272da..2813daf7db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -101,6 +101,16 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final NiFiUser user = NiFiUserUtils.getNiFiUser(); + // If attempting to run the processors, validation must complete first + if (desiredState == ScheduledState.RUNNING) { + try { + waitForProcessorValidation(user, exampleUri, groupId, componentMap, pause); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new LifecycleManagementException("Interrupted while waiting for processors to complete validation"); + } + } + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. try { final NodeResponse clusterResponse; @@ -156,6 +166,64 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle return processorRevisions.stream().collect(Collectors.toMap(Revision::getComponentId, Function.identity())); } + private boolean waitForProcessorValidation(final NiFiUser user, final URI originalUri, final String groupId, + final Map processors, final Pause pause) throws InterruptedException { + URI groupUri; + try { + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final Map headers = new HashMap<>(); + final MultivaluedMap requestEntity = new MultivaluedHashMap<>(); + + boolean continuePolling = true; + while (continuePolling) { + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + return false; + } + + final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class); + final Set processorEntities = processorsEntity.getProcessors(); + + if (isProcessorValidationComplete(processorEntities, processors)) { + logger.debug("All {} processors of interest now have been validated", processors.size()); + return true; + } + + // Not all of the processors are done validating. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + private boolean isProcessorValidationComplete(Set processorEntities, Map affectedComponents) { + updateAffectedProcessors(processorEntities, affectedComponents); + for (final ProcessorEntity entity : processorEntities) { + if (!affectedComponents.containsKey(entity.getId())) { + continue; + } + + if (ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) { + return false; + } + } + return true; + } + /** * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. * @@ -230,30 +298,32 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle return entity; } + private void updateAffectedProcessors(final Set processorEntities, final Map affectedComponents) { + // update the affected processors + processorEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); + affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + } private boolean isProcessorActionComplete(final Set processorEntities, final Map affectedComponents, final ScheduledState desiredState, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException { - final String desiredStateName = desiredState.name(); - // update the affected processors - processorEntities.stream() - .filter(entity -> affectedComponents.containsKey(entity.getId())) - .forEach(entity -> { - final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); - affectedComponentEntity.setRevision(entity.getRevision()); - - // only consider update this component if the user had permissions to it - if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { - final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); - affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); - affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); - - if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { - affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); - } - } - }); + updateAffectedProcessors(processorEntities, affectedComponents); for (final ProcessorEntity entity : processorEntities) { if (!affectedComponents.containsKey(entity.getId())) { @@ -320,6 +390,16 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final NiFiUser user = NiFiUserUtils.getNiFiUser(); + // If enabling services, validation must complete first + if (desiredState == ControllerServiceState.ENABLED) { + try { + waitForControllerServiceValidation(user, originalUri, groupId, affectedServiceIds, pause); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new LifecycleManagementException("Interrupted while waiting for Controller Services to complete validation"); + } + } + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. try { final NodeResponse clusterResponse; @@ -352,6 +432,67 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle .collect(Collectors.toSet()); } + private boolean waitForControllerServiceValidation(final NiFiUser user, final URI originalUri, final String groupId, + final Set serviceIds, final Pause pause) + throws InterruptedException { + + URI groupUri; + try { + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final Map headers = new HashMap<>(); + final MultivaluedMap requestEntity = new MultivaluedHashMap<>(); + + boolean continuePolling = true; + while (continuePolling) { + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + return false; + } + + final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class); + final Set serviceEntities = controllerServicesEntity.getControllerServices(); + + final Map affectedServices = serviceEntities.stream() + .collect(Collectors.toMap(ControllerServiceEntity::getId, dtoFactory::createAffectedComponentEntity)); + + if (isControllerServiceValidationComplete(serviceEntities, affectedServices)) { + logger.debug("All {} controller services of interest have completed validation", affectedServices.size()); + return true; + } + continuePolling = pause.pause(); + } + + return false; + } + + private boolean isControllerServiceValidationComplete(final Set controllerServiceEntities, final Map affectedComponents) { + updateAffectedControllerServices(controllerServiceEntities, affectedComponents); + for (final ControllerServiceEntity entity : controllerServiceEntities) { + if (!affectedComponents.containsKey(entity.getId())) { + continue; + } + + if (ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) { + return false; + } + } + return true; + } + /** * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java index b597797e5d..1a5fc6bda3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java @@ -105,6 +105,9 @@ public class LocalComponentLifecycle implements ComponentLifecycle { logger.debug("Starting components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId); + // Wait for all affected processors to be either VALID or INVALID + waitForProcessorValidation(processGroupId, affectedComponents, pause); + serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions.keySet()); serviceFacade.scheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions); @@ -130,6 +133,42 @@ public class LocalComponentLifecycle implements ComponentLifecycle { waitForProcessorState(processGroupId, affectedComponents, ScheduledState.STOPPED, pause, invalidComponentAction); } + + /** + * Waits for all given Processors to complete validation + * + * @return true if all processors have completed validation, false if the given {@link Pause} + * indicated to give up before all of the processors have completed validation + */ + private boolean waitForProcessorValidation(final String groupId, final Map affectedComponents, final Pause pause) { + + logger.debug("Waiting for {} processors to complete validation", affectedComponents.size()); + boolean continuePolling = true; + while (continuePolling) { + final Set processorEntities = serviceFacade.getProcessors(groupId, true); + if (isProcessorValidationComplete(processorEntities, affectedComponents)) { + logger.debug("All {} processors of interest have completed validation", affectedComponents.size()); + return true; + } + continuePolling = pause.pause(); + } + return false; + } + + private boolean isProcessorValidationComplete(final Set processorEntities, final Map affectedComponents) { + updateAffectedProcessors(processorEntities, affectedComponents); + for (final ProcessorEntity entity : processorEntities) { + if (!affectedComponents.containsKey(entity.getId())) { + continue; + } + + if (ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) { + return false; + } + } + return true; + } + /** * Waits for all of the given Processors to reach the given Scheduled State. * @@ -157,29 +196,33 @@ public class LocalComponentLifecycle implements ComponentLifecycle { return false; } + private void updateAffectedProcessors(final Set processorEntities, final Map affectedComponents) { + // update the affected processors + processorEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + + // only consider updating this component if the user has permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); + affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + } + private boolean isProcessorActionComplete(final Set processorEntities, final Map affectedComponents, final ScheduledState desiredState, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException { final String desiredStateName = desiredState.name(); - // update the affected processors - processorEntities.stream() - .filter(entity -> affectedComponents.containsKey(entity.getId())) - .forEach(entity -> { - final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); - affectedComponentEntity.setRevision(entity.getRevision()); - - // only consider updating this component if the user has permissions to it - if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { - final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); - affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); - affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); - - if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { - affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); - } - } - }); + updateAffectedProcessors(processorEntities, affectedComponents); for (final ProcessorEntity entity : processorEntities) { if (!affectedComponents.containsKey(entity.getId())) { @@ -224,6 +267,8 @@ public class LocalComponentLifecycle implements ComponentLifecycle { logger.debug("Enabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId); + waitForControllerServiceValidation(processGroupId, affectedServices, pause); + serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet()); serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions); waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, invalidComponentAction); @@ -282,6 +327,39 @@ public class LocalComponentLifecycle implements ComponentLifecycle { } } + /** + * Waits for all given Controller Services to complete validation + * + * @return true if all processors have completed validation, false if the given {@link Pause} + * indicated to give up before all of the controller services have completed validation + */ + private boolean waitForControllerServiceValidation(final String groupId, final Map affectedComponents, final Pause pause) { + logger.debug("Waiting for {} controller services to complete validation", affectedComponents.size()); + boolean continuePolling = true; + while (continuePolling) { + final Set serviceEntities = serviceFacade.getControllerServices(groupId, false, true); + if (isControllerServiceValidationComplete(serviceEntities, affectedComponents)) { + logger.debug("All {} controller services of interest have completed validation", affectedComponents.size()); + return true; + } + continuePolling = pause.pause(); + } + return false; + } + + private boolean isControllerServiceValidationComplete(final Set controllerServiceEntities, final Map affectedComponents) { + updateAffectedControllerServices(controllerServiceEntities, affectedComponents); + for (final ControllerServiceEntity entity : controllerServiceEntities) { + if (!affectedComponents.containsKey(entity.getId())) { + continue; + } + + if (ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) { + return false; + } + } + return true; + } /** * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.