From b6117743d4c1c1a37a16ba746b9edbbdd276d69f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 4 Jan 2018 16:09:02 -0500 Subject: [PATCH] NIFI-4436: Ensure that on save, we assign a Versioned Component Identifier to inner process groups that are tracking to remote flows, if they don't have one. This would occur, for instance, if a Process Group was imported into an existing group (or copied/moved into it) and then the existing group was saved. NIFI-4436: Fixed a bug that caused a flow not to successfully change version if a connection is added to an existing component and that component is running at time of version change NIFI-4436: Fixed bug with ordering of controller services being enabled and disabled NIFI-4436: Fixed bug that prevented local input and output ports from being stopped and started as needed NIFI-4436: Fixed bugs around referencing controller services that are at a higher level than the versioned flow NIFI-4436: Ensure that we clear components from FlowController's cache when removed and that they are added to cache when created. NIFI-4436: Fixed error message coming back if component is invalid when trying to be restarted/re-enabled NIFI-4436: Addressed issue with children of a removed process group not being considered 'affected components' and as a result not being stopped/disabled/restarted/re-enabled This closes #2219. Signed-off-by: Matt Gilman --- .../web/api/dto/AffectedComponentDTO.java | 3 + .../service/ControllerServiceProvider.java | 22 ++- .../nifi/controller/FlowController.java | 11 ++ .../StandardControllerServiceProvider.java | 127 +++++++++++++- .../nifi/groups/StandardProcessGroup.java | 42 ++--- .../TestStandardProcessScheduler.java | 17 +- ...TestStandardControllerServiceProvider.java | 9 +- .../apache/nifi/web/NiFiServiceFacade.java | 21 ++- .../apache/nifi/web/ResumeFlowException.java | 31 ++++ .../nifi/web/StandardNiFiServiceFacade.java | 160 +++++++++++++++--- .../apache/nifi/web/api/VersionsResource.java | 33 +++- .../apache/nifi/web/api/dto/DtoFactory.java | 27 +++ .../apache/nifi/web/dao/ProcessGroupDAO.java | 7 +- .../web/dao/impl/StandardProcessGroupDAO.java | 50 +++--- .../nifi/web/util/AffectedComponentUtils.java | 9 + .../web/util/LocalComponentLifecycle.java | 45 +++++ 16 files changed, 518 insertions(+), 96 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java index 95024ca9e9..dd7c6c43db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java @@ -26,6 +26,8 @@ import java.util.Collection; public class AffectedComponentDTO { public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR"; public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE"; + public static final String COMPONENT_TYPE_INPUT_PORT = "INPUT_PORT"; + public static final String COMPONENT_TYPE_OUTPUT_PORT = "OUTPUT_PORT"; public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT"; public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT"; @@ -58,6 +60,7 @@ public class AffectedComponentDTO { @ApiModelProperty(value = "The type of this component", allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", " + + COMPONENT_TYPE_INPUT_PORT + ", " + COMPONENT_TYPE_OUTPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT) public String getReferenceType() { return referenceType; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 010ecdf712..ae5416cc7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -20,6 +20,7 @@ import java.net.URL; import java.util.Collection; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.bundle.BundleCoordinate; @@ -83,6 +84,16 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { */ void enableControllerServices(Collection serviceNodes); + /** + * Enables the collection of services in the background. If a service in this collection + * depends on another service, the service being depended on must either already be enabled + * or must be in the collection as well. + * + * @param serviceNodes the nodes + * @return a Future that can be used to cancel the task or wait until it is completed + */ + Future enableControllerServicesAsync(Collection serviceNodes); + /** * Disables the given controller service so that it cannot be used by other * components. This allows configuration to be updated or allows service to @@ -92,9 +103,18 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { */ CompletableFuture disableControllerService(ControllerServiceNode serviceNode); + /** + * Disables the collection of services in the background. If any of the services given is referenced + * by another service, then that other service must either be disabled or be in the given collection. + * + * @param serviceNodes the nodes the disable + * @return a Future that can be used to cancel the task or wait until it is completed + */ + Future disableControllerServicesAsync(Collection serviceNodes); + /** * @return a Set of all Controller Services that exist for this service - * provider + * provider */ Set getAllControllerServices(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index eb4b8b9f18..88dc11c3f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -245,6 +245,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -3609,12 +3610,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R controllerServiceProvider.enableControllerServices(serviceNodes); } + @Override + public Future enableControllerServicesAsync(final Collection serviceNodes) { + return controllerServiceProvider.enableControllerServicesAsync(serviceNodes); + } + @Override public CompletableFuture disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); return controllerServiceProvider.disableControllerService(serviceNode); } + @Override + public Future disableControllerServicesAsync(final Collection serviceNodes) { + return controllerServiceProvider.disableControllerServicesAsync(serviceNodes); + } + @Override public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 48ad849ab6..b4d9e8bbf6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.commons.lang3.ClassUtils; @@ -50,13 +51,13 @@ import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; @@ -78,7 +79,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class); - private final ProcessScheduler processScheduler; + private final StandardProcessScheduler processScheduler; private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; private final VariableRegistry variableRegistry; @@ -87,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final ConcurrentMap serviceCache = new ConcurrentHashMap<>(); - public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, + public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) { this.flowController = flowController; @@ -384,6 +385,74 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + @Override + public Future enableControllerServicesAsync(final Collection serviceNodes) { + final CompletableFuture future = new CompletableFuture<>(); + processScheduler.submitFrameworkTask(() -> { + enableControllerServices(serviceNodes, future); + future.complete(null); + }); + + return future; + } + + private void enableControllerServices(final Collection serviceNodes, final CompletableFuture completableFuture) { + // validate that we are able to start all of the services. + Iterator serviceIter = serviceNodes.iterator(); + while (serviceIter.hasNext()) { + ControllerServiceNode controllerServiceNode = serviceIter.next(); + List requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices(); + for (ControllerServiceNode requiredService : requiredServices) { + if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) { + logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService); + completableFuture.completeExceptionally(new IllegalStateException("Cannot enable " + controllerServiceNode + + " because it has a dependency on " + requiredService + ", which is not enabled")); + return; + } + } + } + + for (final ControllerServiceNode controllerServiceNode : serviceNodes) { + if (completableFuture.isCancelled()) { + return; + } + + try { + if (!controllerServiceNode.isActive()) { + final Future future = enableControllerServiceDependenciesFirst(controllerServiceNode); + + while (true) { + try { + future.get(1, TimeUnit.SECONDS); + logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState()); + break; + } catch (final TimeoutException e) { + if (completableFuture.isCancelled()) { + return; + } + } catch (final Exception e) { + logger.warn("Failed to enable service {}", controllerServiceNode, e); + completableFuture.completeExceptionally(e); + + if (this.bulletinRepo != null) { + this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", + Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e)); + } + + return; + } + } + } + } catch (Exception e) { + logger.error("Failed to enable " + controllerServiceNode, e); + if (this.bulletinRepo != null) { + this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", + Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e)); + } + } + } + } + private Future enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) { final Map> futures = new HashMap<>(); @@ -460,6 +529,58 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return processScheduler.disableControllerService(serviceNode); } + @Override + public Future disableControllerServicesAsync(final Collection serviceNodes) { + final CompletableFuture future = new CompletableFuture<>(); + processScheduler.submitFrameworkTask(() -> { + disableControllerServices(serviceNodes, future); + future.complete(null); + }); + + return future; + } + + private void disableControllerServices(final Collection serviceNodes, final CompletableFuture future) { + final Set serviceNodeSet = new HashSet<>(serviceNodes); + + // Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection + for (final ControllerServiceNode serviceNode : serviceNodes) { + final List references = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); + for (final ControllerServiceNode reference : references) { + if (reference.isActive()) { + try { + reference.verifyCanDisable(serviceNodeSet); + } catch (final Exception e) { + future.completeExceptionally(e); + } + } + } + } + + for (final ControllerServiceNode serviceNode : serviceNodes) { + if (serviceNode.isActive()) { + disableReferencingServices(serviceNode); + final CompletableFuture serviceFuture = disableControllerService(serviceNode); + + while (true) { + try { + serviceFuture.get(1, TimeUnit.SECONDS); + break; + } catch (final TimeoutException e) { + if (future.isCancelled()) { + return; + } + + continue; + } catch (final Exception e) { + logger.error("Failed to disable {}", serviceNode, e); + future.completeExceptionally(e); + } + } + } + } + } + @Override public ControllerService getControllerService(final String serviceIdentifier) { final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 9418f40852..4cd6e2a435 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -738,7 +738,8 @@ public final class StandardProcessGroup implements ProcessGroup { } for (final ControllerServiceNode cs : group.getControllerServices(false)) { - group.removeControllerService(cs); + // Must go through Controller Service here because we need to ensure that it is removed from the cache + flowController.removeControllerService(cs); } for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) { @@ -3158,9 +3159,13 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier()))); }); - processGroup.getProcessGroups().stream() - .filter(childGroup -> childGroup.getVersionControlInformation() == null) - .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); + for (final ProcessGroup childGroup : processGroup.getProcessGroups()) { + if (childGroup.getVersionControlInformation() == null) { + applyVersionedComponentIds(childGroup, lookup); + } else if (!childGroup.getVersionedComponentId().isPresent()) { + childGroup.setVersionedComponentId(lookup.apply(childGroup.getIdentifier())); + } + } } @@ -3242,7 +3247,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(remoteFlow, localFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set updatedVersionedComponentIds = new HashSet<>(); @@ -3387,7 +3392,6 @@ public final class StandardProcessGroup implements ProcessGroup { .map(VariableDescriptor::getName) .collect(Collectors.toSet()); - final Map updatedVariableMap = new HashMap<>(); // If any new variables exist in the proposed flow, add those to the variable registry. @@ -3477,6 +3481,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); + flowController.onProcessGroupAdded(added); added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); LOG.info("Added {} to {}", added, this); } else if (childCoordinates == null || updateDescendantVersionedGroups) { @@ -3496,6 +3501,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier()); if (funnel == null) { final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed); + flowController.onFunnelAdded(added); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) { updateFunnel(funnel, proposedFunnel); @@ -3517,6 +3523,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier()); if (port == null) { final Port added = addInputPort(group, proposedPort, componentIdSeed); + flowController.onInputPortAdded(added); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { updatePort(port, proposedPort); @@ -3537,6 +3544,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier()); if (port == null) { final Port added = addOutputPort(group, proposedPort, componentIdSeed); + flowController.onOutputPortAdded(added); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { updatePort(port, proposedPort); @@ -3580,6 +3588,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); if (processor == null) { final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed); + flowController.onProcessorAdded(added); final Set proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream() @@ -3638,6 +3647,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier()); if (connection == null) { final Connection added = addConnection(group, proposedConnection, componentIdSeed); + flowController.onConnectionAdded(added); LOG.info("Added {} to {}", added, this); } else if (isUpdateable(connection)) { // If the connection needs to be updated, then the source and destination will already have @@ -3658,6 +3668,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Connection connection = connectionsByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", connection, group); group.removeConnection(connection); + flowController.onConnectionRemoved(connection); } // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships. @@ -3670,7 +3681,8 @@ public final class StandardProcessGroup implements ProcessGroup { for (final String removedVersionedId : controllerServicesRemoved) { final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", service, group); - group.removeControllerService(service); + // Must remove Controller Service through Flow Controller in order to remove from cache + flowController.removeControllerService(service); } for (final String removedVersionedId : funnelsRemoved) { @@ -4065,13 +4077,6 @@ public final class StandardProcessGroup implements ProcessGroup { // to the instance ID of the Controller Service. final String serviceVersionedComponentId = entry.getValue(); String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); - if (instanceId == null) { - // We didn't find the instance ID based on the Versioned Component ID. So we want to just - // leave the value set to whatever it currently is, if it's currently set. - final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build(); - instanceId = currentProperties.get(propertyDescriptor); - } - value = instanceId == null ? serviceVersionedComponentId : instanceId; } else { value = entry.getValue(); @@ -4085,13 +4090,9 @@ public final class StandardProcessGroup implements ProcessGroup { } private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) { - for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) { + for (final ControllerServiceNode serviceNode : group.getControllerServices(true)) { final Optional optionalVersionedId = serviceNode.getVersionedComponentId(); - if (!optionalVersionedId.isPresent()) { - continue; - } - - final String versionedId = optionalVersionedId.get(); + final String versionedId = optionalVersionedId.orElseGet(() -> UUID.nameUUIDFromBytes(serviceNode.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString()); if (versionedId.equals(serviceVersionedComponentId)) { return serviceNode.getIdentifier(); } @@ -4319,7 +4320,6 @@ public final class StandardProcessGroup implements ProcessGroup { } } - // Ensure that all Processors are instantiate-able. final Map proposedProcessors = new HashMap<>(); findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 314738abbe..c0b36c91d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -47,7 +47,6 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ReportingTaskNode; @@ -269,7 +268,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -308,7 +307,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateDisabledServiceCantBeDisabled() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -346,7 +345,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -380,7 +379,7 @@ public class TestStandardProcessScheduler { @Test public void validateDisablingOfTheFailedService() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -412,7 +411,7 @@ public class TestStandardProcessScheduler { @Test @Ignore public void validateEnabledDisableMultiThread() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { @@ -455,7 +454,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -481,7 +480,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -581,7 +580,7 @@ public class TestStandardProcessScheduler { } } - private ProcessScheduler createScheduler() { + private StandardProcessScheduler createScheduler() { return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 0d15143830..ed335e978f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -38,7 +38,6 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ScheduledState; @@ -146,7 +145,7 @@ public class TestStandardControllerServiceProvider { final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); @@ -162,7 +161,7 @@ public class TestStandardControllerServiceProvider { final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group); - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); @@ -214,13 +213,13 @@ public class TestStandardControllerServiceProvider { */ @Test(timeout = 120000) public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); for (int i = 0; i < 5000; i++) { testEnableReferencingServicesGraph(scheduler); } } - public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) { + public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) { final ProcessGroup procGroup = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index ab9674773e..165af45a47 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -116,6 +116,7 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -777,6 +778,15 @@ public interface NiFiServiceFacade { */ PortEntity getInputPort(String inputPortId); + /** + * Gets an input port as it is available to the given user + * + * @param inputPortId The input port id + * @param user the user + * @return port + */ + PortEntity getInputPort(String inputPortId, NiFiUser user); + /** * Gets all input ports in a given group. * @@ -846,6 +856,15 @@ public interface NiFiServiceFacade { */ PortEntity getOutputPort(String outputPortId); + /** + * Gets an output port as it is available to the given user + * + * @param outputPortId The output port id + * @param user the user + * @return port + */ + PortEntity getOutputPort(String outputPortId, NiFiUser user); + /** * Gets all output ports in a given group. * @@ -1008,7 +1027,7 @@ public interface NiFiServiceFacade { * @param state the state * @param serviceIds the id's of the services */ - void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Set serviceIds); + void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Collection serviceIds); /** * Enables or disables the controller services with the given IDs & Revisions on behalf of the currently logged in user diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java new file mode 100644 index 0000000000..8e1e123316 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web; + +/** + * Thrown whenever a flow cannot be resumed due to validation error, etc. + */ +public class ResumeFlowException extends Exception { + public ResumeFlowException(final String message, final Throwable cause) { + super(message, cause); + } + + public ResumeFlowException(final String message) { + super(message); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 1ccced2f1e..6e0e9d2663 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.commons.collections4.CollectionUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -271,8 +273,8 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; +import com.google.common.collect.Sets; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -484,8 +486,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set serviceIds) { - processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds); + public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection serviceIds) { + processGroupDAO.verifyActivateControllerServices(state, serviceIds); } @Override @@ -1016,7 +1018,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public RevisionUpdate update() { // schedule the components - processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); + processGroupDAO.activateControllerServices(state, serviceRevisions.keySet()); // update the revisions final Map updatedRevisions = new HashMap<>(); @@ -3289,8 +3291,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private PortEntity createInputPortEntity(final Port port) { + return createInputPortEntity(port, NiFiUserUtils.getNiFiUser()); + } + + private PortEntity createInputPortEntity(final Port port, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -3298,8 +3304,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private PortEntity createOutputPortEntity(final Port port) { + return createOutputPortEntity(port, NiFiUserUtils.getNiFiUser()); + } + + private PortEntity createOutputPortEntity(final Port port, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -3408,6 +3418,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createInputPortEntity(port); } + @Override + public PortEntity getInputPort(final String inputPortId, final NiFiUser user) { + final Port port = inputPortDAO.getPort(inputPortId); + return createInputPortEntity(port, user); + } + @Override public PortStatusEntity getInputPortStatus(final String inputPortId) { final Port inputPort = inputPortDAO.getPort(inputPortId); @@ -3422,6 +3438,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createOutputPortEntity(port); } + @Override + public PortEntity getOutputPort(final String outputPortId, final NiFiUser user) { + final Port port = outputPortDAO.getPort(outputPortId); + return createOutputPortEntity(port, user); + } + @Override public PortStatusEntity getOutputPortStatus(final String outputPortId) { final Port outputPort = outputPortDAO.getPort(outputPortId); @@ -3974,28 +3996,95 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { }) .collect(Collectors.toCollection(HashSet::new)); - final Map> connectionsByVersionedId = group.findAllConnections().stream() - .filter(conn -> conn.getVersionedComponentId().isPresent()) - .collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get())); + for (final FlowDifference difference : comparison.getDifferences()) { + final VersionedComponent localComponent = difference.getComponentA(); + if (localComponent == null) { + continue; + } + // If any Process Group is removed, consider all components below that Process Group as an affected component + if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) { + final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId(); + final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId); + + localGroup.findAllProcessors().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllFunnels().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllInputPorts().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllOutputPorts().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllRemoteProcessGroups().stream() + .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream())) + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllControllerServices().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + } + + if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) { + final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId(); + final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId); + + final List referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); + for (final ControllerServiceNode referencingService : referencingServices) { + affectedComponents.add(createAffectedComponentEntity(referencingService, user)); + } + + final List referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); + for (final ProcessorNode referencingProcessor : referencingProcessors) { + affectedComponents.add(createAffectedComponentEntity(referencingProcessor, user)); + } + } + } + + // Create a map of all connectable components by versioned component ID to the connectable component itself + final Map> connectablesByVersionId = new HashMap<>(); + mapToConnectableId(group.findAllFunnels(), connectablesByVersionId); + mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId); + mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId); + mapToConnectableId(group.findAllProcessors(), connectablesByVersionId); + + final List remotePorts = new ArrayList<>(); + for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) { + remotePorts.addAll(rpg.getInputPorts()); + remotePorts.addAll(rpg.getOutputPorts()); + } + mapToConnectableId(remotePorts, connectablesByVersionId); + + // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently) + // and the destination (if it exists in the flow currently). for (final FlowDifference difference : comparison.getDifferences()) { VersionedComponent component = difference.getComponentA(); if (component == null) { component = difference.getComponentB(); } - if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) { - final VersionedConnection connection = (VersionedConnection) component; + if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) { + continue; + } - final String versionedConnectionId = connection.getIdentifier(); - final List instances = connectionsByVersionedId.get(versionedConnectionId); - if (instances == null) { - continue; + final VersionedConnection connection = (VersionedConnection) component; + + final String sourceVersionedId = connection.getSource().getId(); + final List sources = connectablesByVersionId.get(sourceVersionedId); + if (sources != null) { + for (final Connectable source : sources) { + affectedComponents.add(createAffectedComponentEntity(source, user)); } + } - for (final Connection instance : instances) { - affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user)); - affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user)); + final String destinationVersionId = connection.getDestination().getId(); + final List destinations = connectablesByVersionId.get(destinationVersionId); + if (destinations != null) { + for (final Connectable destination : destinations) { + affectedComponents.add(createAffectedComponentEntity(destination, user)); } } } @@ -4003,6 +4092,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return affectedComponents; } + private void mapToConnectableId(final Collection connectables, final Map> destination) { + for (final Connectable connectable : connectables) { + final Optional versionedId = connectable.getVersionedComponentId(); + if (!versionedId.isPresent()) { + continue; + } + + final List byVersionedId = destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>()); + byVersionedId.add(connectable); + } + } + private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) { final AffectedComponentEntity entity = new AffectedComponentEntity(); @@ -4023,6 +4124,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entity; } + private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode, final NiFiUser user) { + final AffectedComponentEntity entity = new AffectedComponentEntity(); + entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()))); + entity.setId(serviceNode.getIdentifier()); + + final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable(); + final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user); + entity.setPermissions(permissionsDto); + + final AffectedComponentDTO dto = new AffectedComponentDTO(); + dto.setId(serviceNode.getIdentifier()); + dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier()); + dto.setState(serviceNode.getState().name()); + + entity.setComponent(dto); + return entity; + } + private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) { final AffectedComponentEntity entity = new AffectedComponentEntity(); entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index b1069489e6..d8c8ddf4d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -45,6 +45,7 @@ import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.util.BundleUtils; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.ResumeFlowException; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.concurrent.AsyncRequestManager; import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest; @@ -1128,6 +1129,11 @@ public class VersionsResource extends ApplicationResource { idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); + } catch (final ResumeFlowException rfe) { + // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow + // since in this case the flow was successfully updated - we just couldn't re-enable the components. + logger.error(rfe.getMessage(), rfe); + vcur.setFailureReason(rfe.getMessage()); } catch (final Exception e) { logger.error("Failed to update flow to new version", e); vcur.setFailureReason("Failed to update flow to new version due to " + e); @@ -1301,6 +1307,11 @@ public class VersionsResource extends ApplicationResource { idGenerationSeed, false, true); vcur.markComplete(updatedVersionControlEntity); + } catch (final ResumeFlowException rfe) { + // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow + // since in this case the flow was successfully updated - we just couldn't re-enable the components. + logger.error(rfe.getMessage(), rfe); + vcur.setFailureReason(rfe.getMessage()); } catch (final Exception e) { logger.error("Failed to update flow to new version", e); vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage()); @@ -1333,13 +1344,15 @@ public class VersionsResource extends ApplicationResource { private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri, final Set affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity, final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest asyncRequest, final String idGenerationSeed, - final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException { + final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException, ResumeFlowException { // Steps 6-7: Determine which components must be stopped and stop them. final Set stoppableReferenceTypes = new HashSet<>(); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT); + stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT); + stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT); final Set runningComponents = affectedComponents.stream() .filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType())) @@ -1459,7 +1472,14 @@ public class VersionsResource extends ApplicationResource { asyncRequest.setCancelCallback(enableServicesPause::cancel); final Set servicesToEnable = getUpdatedEntities(enabledServices, user); logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); - componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); + + try { + componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); + } catch (final IllegalStateException ise) { + // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide + // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. + throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise); + } } if (!asyncRequest.isCancelled()) { @@ -1474,7 +1494,14 @@ public class VersionsResource extends ApplicationResource { final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); asyncRequest.setCancelCallback(startComponentsPause::cancel); logger.info("Restarting {} Processors", componentsToStart.size()); - componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); + + try { + componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); + } catch (final IllegalStateException ise) { + // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide + // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. + throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 41983034d3..3d9f521dab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -179,6 +179,7 @@ import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity; +import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; @@ -1799,6 +1800,32 @@ public final class DtoFactory { return component; } + public AffectedComponentEntity createAffectedComponentEntity(final PortEntity portEntity, final String referenceType) { + if (portEntity == null) { + return null; + } + + final AffectedComponentEntity component = new AffectedComponentEntity(); + component.setBulletins(portEntity.getBulletins()); + component.setId(portEntity.getId()); + component.setPermissions(portEntity.getPermissions()); + component.setPosition(portEntity.getPosition()); + component.setRevision(portEntity.getRevision()); + component.setUri(portEntity.getUri()); + + final PortDTO portDto = portEntity.getComponent(); + final AffectedComponentDTO componentDto = new AffectedComponentDTO(); + componentDto.setId(portDto.getId()); + componentDto.setName(portDto.getName()); + componentDto.setProcessGroupId(portDto.getParentGroupId()); + componentDto.setReferenceType(referenceType); + componentDto.setState(portDto.getState()); + componentDto.setValidationErrors(portDto.getValidationErrors()); + component.setComponent(componentDto); + + return component; + } + public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) { if (serviceEntity == null) { return null; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 459acfc67f..7582420228 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -25,6 +25,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -74,11 +75,10 @@ public interface ProcessGroupDAO { /** * Verifies the specified controller services can be modified * - * @param groupId the ID of the process group * @param state the desired state * @param serviceIds the ID's of the controller services */ - void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set serviceIds); + void verifyActivateControllerServices(ControllerServiceState state, Collection serviceIds); /** * Schedules the components in the specified process group. @@ -93,11 +93,10 @@ public interface ProcessGroupDAO { /** * Enables or disables the controller services in the specified process group * - * @param groupId the id of the group * @param state the desired state * @param serviceIds the ID's of the services to enable or disable */ - Future activateControllerServices(String groupId, ControllerServiceState state, Set serviceIds); + Future activateControllerServices(ControllerServiceState state, Collection serviceIds); /** * Updates the specified process group. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 5bbb56ffdd..e1d9e69db1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -41,8 +41,10 @@ import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.dao.ProcessGroupDAO; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -130,18 +132,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set serviceIds) { - final ProcessGroup group = locateProcessGroup(flowController, groupId); + public void verifyActivateControllerServices(final ControllerServiceState state, final Collection serviceIds) { + final Set serviceNodes = serviceIds.stream() + .map(flowController::getControllerServiceNode) + .collect(Collectors.toSet()); - group.findAllControllerServices().stream() - .filter(service -> serviceIds.contains(service.getIdentifier())) - .forEach(service -> { - if (state == ControllerServiceState.ENABLED) { - service.verifyCanEnable(); - } else { - service.verifyCanDisable(); - } - }); + for (final ControllerServiceNode serviceNode : serviceNodes) { + if (state == ControllerServiceState.ENABLED) { + serviceNode.verifyCanEnable(serviceNodes); + } else { + serviceNode.verifyCanDisable(serviceNodes); + } + } } @Override @@ -201,26 +203,16 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public Future activateControllerServices(final String groupId, final ControllerServiceState state, final Set serviceIds) { - final ProcessGroup group = locateProcessGroup(flowController, groupId); + public Future activateControllerServices(final ControllerServiceState state, final Collection serviceIds) { + final List serviceNodes = serviceIds.stream() + .map(flowController::getControllerServiceNode) + .collect(Collectors.toList()); - CompletableFuture future = CompletableFuture.completedFuture(null); - for (final String serviceId : serviceIds) { - final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true); - if (serviceNode == null) { - throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId); - } - - if (ControllerServiceState.ENABLED.equals(state)) { - final CompletableFuture serviceFuture = flowController.enableControllerService(serviceNode); - future = CompletableFuture.allOf(future, serviceFuture); - } else { - final CompletableFuture serviceFuture = flowController.disableControllerService(serviceNode); - future = CompletableFuture.allOf(future, serviceFuture); - } + if (state == ControllerServiceState.ENABLED) { + return flowController.enableControllerServicesAsync(serviceNodes); + } else { + return flowController.disableControllerServicesAsync(serviceNodes); } - - return future; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java index a801dcb8b3..f257bb1d21 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java @@ -27,6 +27,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; @@ -39,6 +40,14 @@ public class AffectedComponentUtils { case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR: final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user); return dtoFactory.createAffectedComponentEntity(procEntity); + case AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT: { + final PortEntity portEntity = serviceFacade.getInputPort(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT); + } + case AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT: { + final PortEntity portEntity = serviceFacade.getOutputPort(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT); + } case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE: final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user); return dtoFactory.createAffectedComponentEntity(serviceEntity); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java index e005d283f4..1c7e82d5ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java @@ -18,7 +18,9 @@ package org.apache.nifi.web.util; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; @@ -33,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -228,6 +233,46 @@ public class LocalComponentLifecycle implements ComponentLifecycle { waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user); } + static List> determineEnablingOrder(final Map serviceNodeMap) { + final List> orderedNodeLists = new ArrayList<>(); + + for (final ControllerServiceNode node : serviceNodeMap.values()) { + final List branch = new ArrayList<>(); + determineEnablingOrder(serviceNodeMap, node, branch, new HashSet()); + orderedNodeLists.add(branch); + } + + return orderedNodeLists; + } + + private static void determineEnablingOrder( + final Map serviceNodeMap, + final ControllerServiceNode contextNode, + final List orderedNodes, + final Set visited) { + if (visited.contains(contextNode)) { + return; + } + + for (final Map.Entry entry : contextNode.getProperties().entrySet()) { + if (entry.getKey().getControllerServiceDefinition() != null) { + final String referencedServiceId = entry.getValue(); + if (referencedServiceId != null) { + final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId); + if (!orderedNodes.contains(referencedNode)) { + visited.add(contextNode); + determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited); + } + } + } + } + + if (!orderedNodes.contains(contextNode)) { + orderedNodes.add(contextNode); + } + } + + /** * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. *