From 25e7f314b10ba6b49e350d9f1cc026b3bc0a3194 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 6 May 2016 21:24:50 -0400 Subject: [PATCH] NIFI-1800: Tie Controller Services to Process Groups. This closes #431 --- .../nifi/components/PropertyDescriptor.java | 2 +- .../nifi/components/ValidationContext.java | 7 + .../controller/ControllerServiceLookup.java | 7 +- .../NotificationValidationContext.java | 4 + .../util/MockControllerServiceLookup.java | 2 +- .../MockProcessorInitializationContext.java | 4 +- .../nifi/util/MockValidationContext.java | 9 +- .../mock/MockControllerServiceLookup.java | 2 +- .../StandardAsyncClusterResponse.java | 5 +- .../manager/impl/WebClusterManager.java | 36 +- .../reporting/ClusteredReportingTaskNode.java | 1 - .../AbstractConfiguredComponent.java | 9 +- .../controller/ValidationContextFactory.java | 4 +- .../service/ControllerServiceNode.java | 14 + .../apache/nifi/events/BulletinFactory.java | 5 +- .../org/apache/nifi/groups/ProcessGroup.java | 44 ++ .../nifi/controller/FlowController.java | 16 +- .../nifi/controller/StandardFlowService.java | 2 + .../controller/StandardFlowSynchronizer.java | 81 ++- .../controller/StandardProcessorNode.java | 11 +- .../reporting/AbstractReportingTaskNode.java | 5 + .../reporting/StandardReportingContext.java | 4 +- ...tandardReportingInitializationContext.java | 5 +- .../serialization/FlowEncodingVersion.java | 105 ++++ .../FlowFromDOMFactory.java | 7 +- .../FlowSerializationException.java | 2 +- .../{ => serialization}/FlowSerializer.java | 4 +- .../FlowSynchronizationException.java | 2 +- .../{ => serialization}/FlowSynchronizer.java | 4 +- .../StandardFlowSerializer.java | 16 +- .../service/ControllerServiceLoader.java | 26 +- ...ontrollerServiceInitializationContext.java | 4 +- .../StandardControllerServiceNode.java | 30 +- .../StandardControllerServiceProvider.java | 85 +-- .../manager/StandardStateManagerProvider.java | 2 +- .../nifi/fingerprint/FingerprintFactory.java | 14 +- .../nifi/groups/StandardProcessGroup.java | 123 ++++ .../persistence/FlowConfigurationDAO.java | 4 +- .../StandardSnippetDeserializer.java | 2 +- .../StandardSnippetSerializer.java | 2 +- .../StandardXMLFlowConfigurationDAO.java | 8 +- .../persistence/TemplateDeserializer.java | 2 +- .../nifi/persistence/TemplateSerializer.java | 2 +- .../processor/StandardProcessContext.java | 4 +- .../processor/StandardValidationContext.java | 16 +- .../StandardValidationContextFactory.java | 9 +- .../org/apache/nifi/services/FlowService.java | 4 +- .../src/main/resources/FlowConfiguration.xsd | 4 + .../controller/StandardFlowServiceTest.java | 3 + .../scheduling/TestProcessorLifecycle.java | 6 +- .../TestStandardProcessScheduler.java | 10 +- ...TestStandardControllerServiceProvider.java | 14 +- .../service/mock/MockProcessGroup.java | 530 ++++++++++++++++++ .../zookeeper/TestZooKeeperStateProvider.java | 25 +- .../processor/TestStandardPropertyValue.java | 2 +- .../apache/nifi/web/server/JettyServer.java | 4 +- .../apache/nifi/web/NiFiServiceFacade.java | 8 +- .../nifi/web/StandardNiFiServiceFacade.java | 38 +- .../nifi/web/api/ProcessGroupResource.java | 4 +- .../apache/nifi/web/api/dto/DtoFactory.java | 17 +- 60 files changed, 1197 insertions(+), 224 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/{ => serialization}/FlowFromDOMFactory.java (98%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/{ => serialization}/FlowSerializationException.java (96%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/{ => serialization}/FlowSerializer.java (93%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/{ => serialization}/FlowSynchronizationException.java (97%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/{ => serialization}/FlowSynchronizer.java (93%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/{ => serialization}/StandardFlowSerializer.java (97%) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index 4ff3fff89e..a8b9bdf67a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -138,7 +138,7 @@ public final class PropertyDescriptor implements Comparable // if the property descriptor identifies a Controller Service, validate that the ControllerService exists, is of the correct type, and is valid if (controllerServiceDefinition != null) { - final Set validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition); + final Set validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition, context.getProcessGroupIdentifier()); if (validIdentifiers != null && validIdentifiers.contains(input)) { final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input); if (!context.isValidationRequired(controllerService)) { diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java index a1dcf438dd..66a54fa956 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java @@ -92,4 +92,11 @@ public interface ValidationContext { * support the Expression Language or is not a valid property name */ boolean isExpressionLanguageSupported(String propertyName); + + /** + * Returns the identifier of the ProcessGroup that the component being validated lives in + * + * @return the identifier of the ProcessGroup that the component being validated lives in + */ + String getProcessGroupIdentifier(); } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java index f5300b15c5..3345a9293c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java @@ -54,12 +54,15 @@ public interface ControllerServiceLookup { boolean isControllerServiceEnabled(ControllerService service); /** + * * @param serviceType type of service to get identifiers for + * @param groupId the ID of the Process Group to look in for Controller Services + * * @return the set of all Controller Service Identifiers whose Controller - * Service is of the given type. + * Service is of the given type. * @throws IllegalArgumentException if the given class is not an interface */ - Set getControllerServiceIdentifiers(Class serviceType) throws IllegalArgumentException; + Set getControllerServiceIdentifiers(Class serviceType, String groupId) throws IllegalArgumentException; /** * @param serviceIdentifier identifier to look up diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java index 49afc1622f..f29c1c930a 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java @@ -102,4 +102,8 @@ public class NotificationValidationContext implements ValidationContext { return Boolean.TRUE.equals(supported); } + @Override + public String getProcessGroupIdentifier() { + return null; + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index 219ee24eb4..d6ff5c85d4 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -82,7 +82,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { final Set ids = new HashSet<>(); for (final Map.Entry entry : controllerServiceMap.entrySet()) { if (serviceType.isAssignableFrom(entry.getValue().getService().getClass())) { diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index 6e94943004..87e2b4589f 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -47,8 +47,8 @@ public class MockProcessorInitializationContext implements ProcessorInitializati } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { - return context.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + return context.getControllerServiceIdentifiers(serviceType, groupId); } @Override diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index 6442778c0d..84d32778dc 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -86,8 +86,8 @@ public class MockValidationContext implements ValidationContext, ControllerServi } @Override - public Set getControllerServiceIdentifiers(Class serviceType) { - return context.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(Class serviceType, String groupId) { + return context.getControllerServiceIdentifiers(serviceType, groupId); } @Override @@ -136,4 +136,9 @@ public class MockValidationContext implements ValidationContext, ControllerServi final Boolean supported = expressionLanguageSupported.get(propertyName); return Boolean.TRUE.equals(supported); } + + @Override + public String getProcessGroupIdentifier() { + return "unit test"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java index 17d55272d9..26a6f39934 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java @@ -47,7 +47,7 @@ public class MockControllerServiceLookup implements ControllerServiceLookup { } @Override - public Set getControllerServiceIdentifiers(Class serviceType) + public Set getControllerServiceIdentifiers(Class serviceType, String groupId) throws IllegalArgumentException { return Collections.emptySet(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 52bf805de1..3bcc8e75e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -138,7 +138,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { return null; } - final Set nodeResponses = responseMap.values().stream().map(p -> p.getResponse()).collect(Collectors.toSet()); + final Set nodeResponses = responseMap.values().stream() + .map(p -> p.getResponse()) + .filter(response -> response != null) + .collect(Collectors.toSet()); mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses); logger.debug("Notifying all that merged response is complete for {}", id); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index dbcb8fdbdb..7c2eabfb74 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -115,7 +115,6 @@ import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; -import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; @@ -126,6 +125,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; +import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -406,7 +406,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final byte[] serializedServices = clusterDataFlow.getControllerServices(); if (serializedServices != null && serializedServices.length > 0) { - ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState()); + ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), null, encryptor, bulletinRepository, properties.getAutoResumeState()); } // start multicast broadcasting service, if configured @@ -1271,21 +1271,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return baos.toByteArray(); } - private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException { - final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); - docFactory.setNamespaceAware(true); - - final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); - final Document document = docBuilder.newDocument(); - final Element rootElement = document.createElement("controllerServices"); - document.appendChild(rootElement); - - for (final ControllerServiceNode serviceNode : getAllControllerServices()) { - StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor); - } - - return serialize(document); - } private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException { final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); @@ -1303,19 +1288,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return serialize(document); } - public void saveControllerServices() { - try { - dataFlowManagementService.updateControllerServices(serializeControllerServices()); - } catch (final Exception e) { - logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - - getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(), - "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details.")); - } - } public void saveReportingTasks() { try { @@ -2299,8 +2271,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { - return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); } public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String message) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java index a23cfdd788..0b2c77276a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java @@ -48,5 +48,4 @@ public class ClusteredReportingTaskNode extends AbstractReportingTaskNode { public ReportingContext getReportingContext() { return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider, stateManager); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 65e59ee8a9..be4a198617 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -256,7 +256,9 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public boolean isValid() { - final Collection validationResults = validate(validationContextFactory.newValidationContext(getProperties(), getAnnotationData())); + final Collection validationResults = validate(validationContextFactory.newValidationContext( + getProperties(), getAnnotationData(), getProcessGroupIdentifier())); + for (final ValidationResult result : validationResults) { if (!result.isValid()) { return false; @@ -275,7 +277,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone final List results = new ArrayList<>(); lock.lock(); try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(serviceIdentifiersNotToValidate, getProperties(), getAnnotationData()); + final ValidationContext validationContext = validationContextFactory.newValidationContext( + serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier()); final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -297,6 +300,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone public abstract void verifyModifiable() throws IllegalStateException; + protected abstract String getProcessGroupIdentifier(); + /** * */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java index a7118d4678..be6346f5cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java @@ -24,8 +24,8 @@ import org.apache.nifi.components.ValidationContext; public interface ValidationContextFactory { - ValidationContext newValidationContext(Map properties, String annotationData); + ValidationContext newValidationContext(Map properties, String annotationData, String groupId); - ValidationContext newValidationContext(Set serviceIdentifiersToNotValidate, Map properties, String annotationData); + ValidationContext newValidationContext(Set serviceIdentifiersToNotValidate, Map properties, String annotationData, String groupId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 0d7f3ff9f8..693d6a8877 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -21,9 +21,23 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.groups.ProcessGroup; public interface ControllerServiceNode extends ConfiguredComponent { + /** + * @return the Process Group that this Controller Service belongs to, or null if the Controller Service + * does not belong to any Process Group + */ + ProcessGroup getProcessGroup(); + + /** + * Sets the Process Group for this Controller Service + * + * @param group the group that the service belongs to + */ + void setProcessGroup(ProcessGroup group); + /** *

* Returns a proxy implementation of the Controller Service that this ControllerServiceNode diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java index 4795827a5c..0bd250016d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java @@ -19,6 +19,7 @@ package org.apache.nifi.events; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.ComponentType; @@ -45,7 +46,9 @@ public final class BulletinFactory { break; } - return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message); + final ProcessGroup group = connectable.getProcessGroup(); + final String groupId = group == null ? null : group.getIdentifier(); + return BulletinFactory.createBulletin(groupId, connectable.getIdentifier(), type, connectable.getName(), category, severity, message); } public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 5406954b7a..e1f66ee4c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -29,6 +29,7 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Processor; @@ -433,6 +434,19 @@ public interface ProcessGroup extends Authorizable { */ Funnel findFunnel(String id); + /** + * @param id of the Controller Service + * @return the Controller Service with the given ID, if it exists as a child or + * descendant of this ProcessGroup. This performs a recursive search of all + * descendant ProcessGroups + */ + ControllerServiceNode findControllerService(String id); + + /** + * @return a List of all Controller Services contained within this ProcessGroup and any child Process Groups + */ + Set findAllControllerServices(); + /** * Adds the given RemoteProcessGroup to this ProcessGroup * @@ -645,6 +659,36 @@ public interface ProcessGroup extends Authorizable { */ void removeFunnel(Funnel funnel); + /** + * Adds the given Controller Service to this group + * + * @param service the service to add + */ + void addControllerService(ControllerServiceNode service); + + /** + * Returns the controller service with the given id + * + * @param id the id of the controller service + * @return the controller service with the given id, or null if no service exists with that id + */ + ControllerServiceNode getControllerService(String id); + + /** + * Returns a Set of all Controller Services that are available in this Process Group + * + * @param recursive if true, returns the Controller Services available to the parent Process Group, its parents, etc. + * @return a Set of all Controller Services that are available in this Process Group + */ + Set getControllerServices(boolean recursive); + + /** + * Removes the given Controller Service from this group + * + * @param service the service to remove + */ + void removeControllerService(ControllerServiceNode service); + /** * @return true if this ProcessGroup has no Processors, Labels, * Connections, ProcessGroups, RemoteProcessGroupReferences, or Ports. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b888a1bdcf..fd770b8b27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -126,6 +126,10 @@ import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSerializer; +import org.apache.nifi.controller.serialization.FlowSynchronizationException; +import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; @@ -262,7 +266,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final RemoteSiteListener externalSiteListener; private final AtomicReference counterRepositoryRef; private final AtomicBoolean initialized = new AtomicBoolean(false); - private final ControllerServiceProvider controllerServiceProvider; + private final StandardControllerServiceProvider controllerServiceProvider; private final KeyService keyService; private final AuditService auditService; private final EventDrivenWorkerQueue eventDrivenWorkerQueue; @@ -436,7 +440,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); - controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository); processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( @@ -487,6 +490,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); instanceId = UUID.randomUUID().toString(); + controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); + controllerServiceProvider.setRootProcessGroup(rootGroup); + if (remoteInputSocketPort == null) { LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set"); externalSiteListener = null; @@ -1426,6 +1432,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R externalSiteListener.setRootGroup(group); } + controllerServiceProvider.setRootProcessGroup(rootGroup); + // update the heartbeat bean this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus)); } finally { @@ -3836,8 +3844,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { - return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index fb72683e7d..f8b3262ff2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -68,6 +68,8 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.cluster.Heartbeater; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index f03b013c47..97ed3a9df5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -55,6 +55,12 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; +import org.apache.nifi.controller.serialization.FlowEncodingVersion; +import org.apache.nifi.controller.serialization.FlowFromDOMFactory; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSynchronizationException; +import org.apache.nifi.controller.serialization.FlowSynchronizer; +import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; @@ -123,19 +129,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Element rootElement = document.getDocumentElement(); final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); - final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor); + final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement); + + final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion); return isEmpty(rootGroupDto); } @Override public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { - // get the controller's root group - final ProcessGroup rootGroup = controller.getGroup(controller.getRootGroupId()); - // handle corner cases involving no proposed flow if (proposedFlow == null) { - if (rootGroup.isEmpty()) { + if (controller.getGroup(controller.getRootGroupId()).isEmpty()) { return; // no sync to perform } else { throw new UninheritableFlowException("Proposed configuration is empty, but the controller contains a data flow."); @@ -160,6 +165,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } else { final Document document = parseFlowBytes(existingFlow); final Element rootElement = document.getDocumentElement(); + final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement); logger.trace("Setting controller thread counts"); final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount"); @@ -180,17 +186,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); - final List controllerServiceElements; + final List unrootedControllerServiceElements; if (controllerServicesElement == null) { - controllerServiceElements = Collections.emptyList(); + unrootedControllerServiceElements = Collections.emptyList(); } else { - controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); + unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); } logger.trace("Parsing process group from DOM"); final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); - final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor); - existingFlowEmpty = taskElements.isEmpty() && controllerServiceElements.isEmpty() && isEmpty(rootGroupDto); + final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion); + existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto); logger.debug("Existing Flow Empty = {}", existingFlowEmpty); } } @@ -237,6 +243,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { synchronized (configuration) { // get the root element final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0); + final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement); // set controller config logger.trace("Updating flow config"); @@ -252,12 +259,27 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // get the root group XML element final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); + // if this controller isn't initialized or its empty, add the root group, otherwise update + final ProcessGroup rootGroup; + if (!initialized || existingFlowEmpty) { + logger.trace("Adding root process group"); + rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); + } else { + logger.trace("Updating root process group"); + rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); + } + final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); if (controllerServicesElement != null) { final List serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); if (!initialized || existingFlowEmpty) { - ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState); + // If the encoding version is null, we are loading a flow from NiFi 0.x, where Controller + // Services could not be scoped by Process Group. As a result, we want to move the Process Groups + // to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level + // Controller Service. + final ProcessGroup group = (encodingVersion == null) ? rootGroup : null; + ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor, controller.getBulletinRepository(), autoResumeState); } else { for (final Element serviceElement : serviceElements) { updateControllerService(controller, serviceElement, encryptor); @@ -265,15 +287,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - // if this controller isn't initialized or its emtpy, add the root group, otherwise update - if (!initialized || existingFlowEmpty) { - logger.trace("Adding root process group"); - addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); - } else { - logger.trace("Updating root process group"); - updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); - } - final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); if (reportingTasksElement != null) { final List taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); @@ -488,14 +501,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) - throws ProcessorInstantiationException { + private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, + final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { // get the parent group ID final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); // get the process group - final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor); + final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion); // update the process group if (parentId == null) { @@ -636,7 +649,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // update nested process groups (recursively) final List nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup"); for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) { - updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor); + updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion); } // update connections @@ -692,6 +705,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + // Update Controller Services + final List serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); + for (final Element serviceNodeElement : serviceNodeList) { + updateControllerService(controller, serviceNodeElement, encryptor); + } + return processGroup; } @@ -749,13 +768,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) - throws ProcessorInstantiationException { + private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, + final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { // get the parent group ID final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); // add the process group - final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor); + final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion); final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId()); processGroup.setComments(processGroupDTO.getComments()); processGroup.setPosition(toPosition(processGroupDTO.getPosition())); @@ -892,7 +911,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // add nested process groups (recursively) final List nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup"); for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) { - addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor); + addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion); } // add remote process group @@ -1027,6 +1046,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { processGroup.addConnection(connection); } + // Add Controller Services + final List serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); + if (!serviceNodeList.isEmpty()) { + ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState); + } + return processGroup; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 0d168ec564..346f3dbc43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -918,8 +918,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isValid() { try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), - getAnnotationData()); + final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier()); final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -966,7 +965,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final List results = new ArrayList<>(); try { final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), - getAnnotationData()); + getAnnotationData(), getProcessGroup().getIdentifier()); final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -1421,4 +1420,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable callback.postMonitor(); } } + + @Override + protected String getProcessGroupIdentifier() { + final ProcessGroup group = getProcessGroup(); + return group == null ? null : group.getIdentifier(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index c3eb0a08ed..6aead13739 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -254,4 +254,9 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public String toString() { return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]"; } + + @Override + protected String getProcessGroupIdentifier() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 11d1b51dec..ff767efdbd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -115,8 +115,8 @@ public class StandardReportingContext implements ReportingContext, ControllerSer } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { - return serviceProvider.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index 0131a95948..8b7b3bfa5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -69,13 +69,14 @@ public class StandardReportingInitializationContext implements ReportingInitiali return schedulingPeriod; } + @Override public SchedulingStrategy getSchedulingStrategy() { return schedulingStrategy; } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { - return serviceProvider.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java new file mode 100644 index 0000000000..374611202b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.serialization; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.w3c.dom.Element; + +/** + * Provides a mechanism for interpreting the version of the encoding scheme that was used to serialize + * a NiFi Flow. The versioning scheme is made up of a major version and a minor version, both being + * positive integers. + */ +public class FlowEncodingVersion { + public static final String ENCODING_VERSION_ATTRIBUTE = "encoding-version"; + + private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d)+"); + + private final int majorVersion; + private final int minorVersion; + + public FlowEncodingVersion(final int majorVersion, final int minorVersion) { + if (majorVersion < 0) { + throw new IllegalArgumentException("Invalid version: Major version cannot be less than 0 but was " + majorVersion); + } + if (minorVersion < 0) { + throw new IllegalArgumentException("Invalid version: Minor version cannot be less than 0 but was " + minorVersion); + } + + this.majorVersion = majorVersion; + this.minorVersion = minorVersion; + } + + /** + * Parses the 'encoding-version' attribute of the given XML Element as FlowEncodingVersion. + * The attribute value is expected to be in the format <major version>.<minor version< + * + * @param xmlElement the XML Element that contains an 'encoding-version' attribute + * @return a FlowEncodingVersion that has the major and minor versions specified in the String, or null if the input is null or the input + * does not have an 'encoding-version' attribute + * + * @throws IllegalArgumentException if the value is not in the format <major version>.<minor version>, if either major version or minor + * version is not an integer, or if either the major or minor version is less than 0. + */ + public static FlowEncodingVersion parse(final Element xmlElement) { + if (xmlElement == null) { + return null; + } + + final String version = xmlElement.getAttribute(ENCODING_VERSION_ATTRIBUTE); + if (version == null) { + return null; + } + + return parse(version); + } + + /** + * Parses the given String as FlowEncodingVersion. The String is expected to be in the format <major version>.<minor version< + * + * @param version the String representation of the encoding version + * @return a FlowEncodingVersion that has the major and minor versions specified in the String, or null if the input is null + * + * @throws IllegalArgumentException if the value is not in the format <major version>.<minor version>, if either major version or minor + * version is not an integer, or if either the major or minor version is less than 0. + */ + public static FlowEncodingVersion parse(final String version) { + if (version == null || version.trim().isEmpty()) { + return null; + } + + final Matcher matcher = VERSION_PATTERN.matcher(version.trim()); + if (!matcher.matches()) { + throw new IllegalArgumentException(version + " is not a valid version for Flow serialization. Should be in format ."); + } + + final int majorVersion = Integer.parseInt(matcher.group(1)); + final int minorVersion = Integer.parseInt(matcher.group(2)); + return new FlowEncodingVersion(majorVersion, minorVersion); + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java similarity index 98% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 144395c1b7..f6de870d22 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.serialization; import java.util.ArrayList; import java.util.HashMap; @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; @@ -114,7 +115,7 @@ public class FlowFromDOMFactory { return dto; } - public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) { + public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) { final ProcessGroupDTO dto = new ProcessGroupDTO(); final String groupId = getString(element, "id"); dto.setId(groupId); @@ -169,7 +170,7 @@ public class FlowFromDOMFactory { nodeList = DomUtils.getChildNodesByTagName(element, "processGroup"); for (int i = 0; i < nodeList.getLength(); i++) { - processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor)); + processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor, encodingVersion)); } nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java similarity index 96% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java index 6f8025ba16..c552c391e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.serialization; /** * Represents the exceptional case when flow configuration is malformed and therefore, cannot be serialized or deserialized. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java similarity index 93% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializer.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java index 1a4aed3a35..dae7566ba9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.serialization; import java.io.OutputStream; +import org.apache.nifi.controller.FlowController; + /** * Serializes the flow configuration of a controller instance to an output stream. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java similarity index 97% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java index c648664e6e..33b4d3c3ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.serialization; import org.apache.nifi.cluster.ConnectionException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java similarity index 93% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java index d505db622e..86614afbe8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.serialization; import org.apache.nifi.cluster.protocol.DataFlow; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.UninheritableFlowException; import org.apache.nifi.encrypt.StringEncryptor; /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java similarity index 97% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index 5d04cc2258..e2ce75114e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.serialization; import java.io.BufferedOutputStream; import java.io.OutputStream; @@ -39,6 +39,9 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; @@ -59,6 +62,7 @@ import org.w3c.dom.Element; * NOT THREAD-SAFE. */ public class StandardFlowSerializer implements FlowSerializer { + private static final String MAX_ENCODING_VERSION = "1.0"; private final StringEncryptor encryptor; @@ -78,6 +82,7 @@ public class StandardFlowSerializer implements FlowSerializer { // populate document with controller state final Element rootNode = doc.createElement("flowController"); + rootNode.setAttribute("encoding-version", MAX_ENCODING_VERSION); doc.appendChild(rootNode); addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount()); addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); @@ -85,9 +90,6 @@ public class StandardFlowSerializer implements FlowSerializer { final Element controllerServicesNode = doc.createElement("controllerServices"); rootNode.appendChild(controllerServicesNode); - for (final ControllerServiceNode serviceNode : controller.getAllControllerServices()) { - addControllerService(controllerServicesNode, serviceNode, encryptor); - } final Element reportingTasksNode = doc.createElement("reportingTasks"); rootNode.appendChild(reportingTasksNode); @@ -180,6 +182,10 @@ public class StandardFlowSerializer implements FlowSerializer { for (final Connection connection : group.getConnections()) { addConnection(element, connection); } + + for (final ControllerServiceNode service : group.getControllerServices(false)) { + addControllerService(element, service); + } } private void addStyle(final Element parentElement, final Map style) { @@ -408,7 +414,7 @@ public class StandardFlowSerializer implements FlowSerializer { parentElement.appendChild(element); } - public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) { + public void addControllerService(final Element element, final ControllerServiceNode serviceNode) { final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); addTextElement(serviceElement, "id", serviceNode.getIdentifier()); addTextElement(serviceElement, "name", serviceNode.getName()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index b5c3855138..74533ffabf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -31,8 +31,9 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; -import org.apache.nifi.controller.FlowFromDOMFactory; +import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.DomUtils; import org.apache.nifi.web.api.dto.ControllerServiceDTO; @@ -47,12 +48,8 @@ public class ControllerServiceLoader { private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); - public static List loadControllerServices( - final ControllerServiceProvider provider, - final InputStream serializedStream, - final StringEncryptor encryptor, - final BulletinRepository bulletinRepo, - final boolean autoResumeState) throws IOException { + public static List loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final ProcessGroup parentGroup, + final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException { final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); documentBuilderFactory.setNamespaceAware(true); @@ -93,21 +90,22 @@ public class ControllerServiceLoader { final Document document = builder.parse(in); final Element controllerServices = document.getDocumentElement(); final List serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); - return new ArrayList<>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState)); + return new ArrayList<>(loadControllerServices(serviceElements, provider, parentGroup, encryptor, bulletinRepo, autoResumeState)); } catch (SAXException | ParserConfigurationException sxe) { throw new IOException(sxe); } } - public static Collection loadControllerServices( - final List serviceElements, - final ControllerServiceProvider provider, - final StringEncryptor encryptor, - final BulletinRepository bulletinRepo, - final boolean autoResumeState) { + public static Collection loadControllerServices(final List serviceElements, final ControllerServiceProvider provider, final ProcessGroup parentGroup, + final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { + final Map nodeMap = new HashMap<>(); for (final Element serviceElement : serviceElements) { final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor); + if (parentGroup != null) { + parentGroup.addControllerService(serviceNode); + } + // We need to clone the node because it will be used in a separate thread below, and // Element is not thread-safe. nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java index 482dabfd48..fc6d376482 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java @@ -49,8 +49,8 @@ public class StandardControllerServiceInitializationContext implements Controlle } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { - return serviceProvider.getControllerServiceIdentifiers(serviceType); + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index bed6a35faf..089e31bdbf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -39,6 +39,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; @@ -62,11 +63,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final Set referencingComponents = new HashSet<>(); private String comment; + private ProcessGroup processGroup; private final AtomicBoolean active; public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { super(implementation, id, validationContextFactory, serviceProvider); this.proxedControllerService = proxiedControllerService; this.implementation = implementation; @@ -84,6 +86,26 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i return implementation; } + @Override + public ProcessGroup getProcessGroup() { + readLock.lock(); + try { + return processGroup; + } finally { + readLock.unlock(); + } + } + + @Override + public void setProcessGroup(final ProcessGroup group) { + writeLock.lock(); + try { + this.processGroup = group; + } finally { + writeLock.unlock(); + } + } + @Override public ControllerServiceReference getReferences() { readLock.lock(); @@ -367,4 +389,10 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString()); } } + + @Override + protected String getProcessGroupIdentifier() { + final ProcessGroup procGroup = getProcessGroup(); + return procGroup == null ? null : procGroup.getIdentifier(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 132d623c95..05f5036245 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -30,19 +30,16 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.nifi.annotation.lifecycle.OnAdded; -import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -51,6 +48,7 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; @@ -68,10 +66,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class); private final ProcessScheduler processScheduler; - private final ConcurrentMap controllerServices; private static final Set validDisabledMethods; private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; + private volatile ProcessGroup rootGroup; static { // methods that are okay to be called when the service is disabled. @@ -88,12 +86,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider) { // the following 2 maps must be updated atomically, but we do not lock around them because they are modified // only in the createControllerService method, and both are modified before the method returns - this.controllerServices = new ConcurrentHashMap<>(); this.processScheduler = scheduler; this.bulletinRepo = bulletinRepo; this.stateManagerProvider = stateManagerProvider; } + public void setRootProcessGroup(ProcessGroup rootGroup) { + this.rootGroup = rootGroup; + } + private Class[] getInterfaces(final Class cls) { final List> allIfcs = new ArrayList<>(); populateInterfaces(cls, allIfcs); @@ -195,7 +196,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } - this.controllerServices.put(id, serviceNode); return serviceNode; } catch (final Throwable t) { throw new ControllerServiceInstantiationException(t); @@ -444,7 +444,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public ControllerService getControllerService(final String serviceIdentifier) { - final ControllerServiceNode node = controllerServices.get(serviceIdentifier); + final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); return node == null ? null : node.getProxiedControllerService(); } @@ -455,27 +455,43 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public boolean isControllerServiceEnabled(final String serviceIdentifier) { - final ControllerServiceNode node = controllerServices.get(serviceIdentifier); + final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); return node == null ? false : ControllerServiceState.ENABLED == node.getState(); } @Override public boolean isControllerServiceEnabling(final String serviceIdentifier) { - final ControllerServiceNode node = controllerServices.get(serviceIdentifier); + final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); return node == null ? false : ControllerServiceState.ENABLING == node.getState(); } @Override public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { - return controllerServices.get(serviceIdentifier); + final ProcessGroup group = rootGroup; + return group == null ? null : group.findControllerService(serviceIdentifier); } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + ProcessGroup group = rootGroup; + if (group == null) { + return Collections.emptySet(); + } + + if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) { + group = group.findProcessGroup(groupId); + } + + if (group == null) { + return Collections.emptySet(); + } + + final Set serviceNodes = group.getControllerServices(true); + final Set identifiers = new HashSet<>(); - for (final Map.Entry entry : controllerServices.entrySet()) { - if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) { - identifiers.add(entry.getKey()); + for (final ControllerServiceNode serviceNode : serviceNodes) { + if (requireNonNull(serviceType).isAssignableFrom(serviceNode.getProxiedControllerService().getClass())) { + identifiers.add(serviceNode.getIdentifier()); } } @@ -490,39 +506,22 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void removeControllerService(final ControllerServiceNode serviceNode) { - final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier()); - if (existing == null || existing != serviceNode) { - throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow"); + final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup(); + if (group == null) { + throw new IllegalArgumentException("Cannot remote Controller Service " + serviceNode + " because it does not belong to any Process Group"); } - serviceNode.verifyCanDelete(); - - try (final NarCloseable x = NarCloseable.withNarLoader()) { - final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this, null); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext); - } - - for (final Map.Entry entry : serviceNode.getProperties().entrySet()) { - final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.getControllerServiceDefinition() != null) { - final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); - if (value != null) { - final ControllerServiceNode referencedNode = getControllerServiceNode(value); - if (referencedNode != null) { - referencedNode.removeReference(serviceNode); - } - } - } - } - - controllerServices.remove(serviceNode.getIdentifier()); - - stateManagerProvider.onComponentRemoved(serviceNode.getIdentifier()); + group.removeControllerService(serviceNode); } @Override public Set getAllControllerServices() { - return new HashSet<>(controllerServices.values()); + final ProcessGroup group = rootGroup; + if (group == null) { + return Collections.emptySet(); + } + + return group.findAllControllerServices(); } /** @@ -560,6 +559,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public Set enableReferencingServices(final ControllerServiceNode serviceNode) { final List recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences); return enableReferencingServices(serviceNode, recursiveReferences); } @@ -580,6 +580,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi for (final ControllerServiceNode nodeToEnable : recursiveReferences) { if (!nodeToEnable.isActive()) { + logger.debug("Enabling {} because it references {}", nodeToEnable, serviceNode); enableControllerService(nodeToEnable); updated.add(nodeToEnable); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index ef46f55dfc..a2a9104726 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -186,7 +186,7 @@ public class StandardStateManagerProvider implements StateManagerProvider { provider.initialize(initContext); } - final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null); + final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null); final Collection results = provider.validate(validationContext); final StringBuilder validationFailures = new StringBuilder(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 5cb39e1a1a..5ede1758fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -41,9 +41,9 @@ import javax.xml.validation.SchemaFactory; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.FlowFromDOMFactory; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.processor.Processor; import org.apache.nifi.util.DomUtils; @@ -434,6 +434,18 @@ public final class FingerprintFactory { } } + final Set services = snippet.getControllerServices(); + if (services == null || services.isEmpty()) { + builder.append("NO_CONTROLLER_SERVICES"); + } else { + final List sortedServices = new ArrayList<>(services); + Collections.sort(sortedServices, componentComparator); + + for (final ControllerServiceDTO service : sortedServices) { + addControllerServiceFingerprint(builder, service); + } + } + return builder; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index cd13ba94e0..8567c85c50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -36,6 +36,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.LocalPort; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -45,6 +46,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.NarCloseable; @@ -92,6 +94,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map remoteGroups = new HashMap<>(); private final Map processors = new HashMap<>(); private final Map funnels = new HashMap<>(); + private final Map controllerServices = new HashMap<>(); private final StringEncryptor encryptor; private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -1720,6 +1723,41 @@ public final class StandardProcessGroup implements ProcessGroup { return null; } + @Override + public ControllerServiceNode findControllerService(final String id) { + return findControllerService(id, this); + } + + private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) { + ControllerServiceNode service = start.getControllerService(id); + if (service != null) { + return service; + } + + for (final ProcessGroup group : start.getProcessGroups()) { + service = findControllerService(id, group); + if (service != null) { + return service; + } + } + + return null; + } + + @Override + public Set findAllControllerServices() { + return findAllControllerServices(this); + } + + public Set findAllControllerServices(ProcessGroup start) { + final Set services = start.getControllerServices(false); + for (final ProcessGroup group : start.getProcessGroups()) { + services.addAll(findAllControllerServices(group)); + } + + return services; + } + @Override public void removeFunnel(final Funnel funnel) { writeLock.lock(); @@ -1759,6 +1797,91 @@ public final class StandardProcessGroup implements ProcessGroup { } } + + @Override + public void addControllerService(final ControllerServiceNode service) { + writeLock.lock(); + try { + final String id = requireNonNull(service).getIdentifier(); + final ControllerServiceNode existingService = controllerServices.get(id); + if (existingService != null) { + throw new IllegalStateException("A Controller Service is already registered to this ProcessGroup with ID " + id); + } + + service.setProcessGroup(this); + this.controllerServices.put(service.getIdentifier(), service); + LOG.info("{} added to {}", service, this); + } finally { + writeLock.unlock(); + } + } + + @Override + public ControllerServiceNode getControllerService(final String id) { + readLock.lock(); + try { + return controllerServices.get(requireNonNull(id)); + } finally { + readLock.unlock(); + } + } + + @Override + public Set getControllerServices(final boolean recursive) { + readLock.lock(); + try { + final Set services = new HashSet<>(); + services.addAll(controllerServices.values()); + + if (recursive && parent.get() != null) { + services.addAll(parent.get().getControllerServices(true)); + } + + return services; + } finally { + readLock.unlock(); + } + } + + @Override + public void removeControllerService(final ControllerServiceNode service) { + writeLock.lock(); + try { + final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier()); + if (existing == null) { + throw new IllegalStateException(service + " is not a member of this Process Group"); + } + + service.verifyCanDelete(); + + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); + } + + for (final Map.Entry entry : service.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.getControllerServiceDefinition() != null) { + final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); + if (value != null) { + final ControllerServiceNode referencedNode = getControllerService(value); + if (referencedNode != null) { + referencedNode.removeReference(service); + } + } + } + } + + controllerServices.remove(service.getIdentifier()); + flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier()); + + LOG.info("{} removed from {}", service, this); + } finally { + writeLock.unlock(); + } + } + + @Override public void remove(final Snippet snippet) { writeLock.lock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index 54d5959120..c5c1b3b8ea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -23,9 +23,9 @@ import java.io.OutputStream; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.FlowSerializationException; -import org.apache.nifi.controller.FlowSynchronizationException; import org.apache.nifi.controller.UninheritableFlowException; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSynchronizationException; /** * Interface to define service methods for FlowController configuration. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetDeserializer.java index 7f2dfa19c8..e5b4796d2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetDeserializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetDeserializer.java @@ -24,8 +24,8 @@ import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.transform.stream.StreamSource; -import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.controller.StandardSnippet; +import org.apache.nifi.controller.serialization.FlowSerializationException; public class StandardSnippetDeserializer { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetSerializer.java index afcc4e5843..22cd61bccb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardSnippetSerializer.java @@ -23,8 +23,8 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; -import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.controller.StandardSnippet; +import org.apache.nifi.controller.serialization.FlowSerializationException; public final class StandardSnippetSerializer { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index ac304ecf96..325f02a13f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -29,12 +29,12 @@ import java.util.zip.GZIPOutputStream; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.FlowSerializationException; -import org.apache.nifi.controller.FlowSynchronizationException; -import org.apache.nifi.controller.FlowSynchronizer; -import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.StandardFlowSynchronizer; import org.apache.nifi.controller.UninheritableFlowException; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSynchronizationException; +import org.apache.nifi.controller.serialization.FlowSynchronizer; +import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java index ad2aec24e5..fef0709442 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java @@ -24,7 +24,7 @@ import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.transform.stream.StreamSource; -import org.apache.nifi.controller.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.web.api.dto.TemplateDTO; public class TemplateDeserializer { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java index db60480165..72d7f56834 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java @@ -23,7 +23,7 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; -import org.apache.nifi.controller.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.web.api.dto.TemplateDTO; public final class TemplateSerializer { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index dae7d0ca02..d83e08cfed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -130,11 +130,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService } @Override - public Set getControllerServiceIdentifiers(final Class serviceType) { + public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { if (!serviceType.isInterface()) { throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface"); } - return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); + return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index c24e14665a..1aec203e57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -44,20 +44,23 @@ public class StandardValidationContext implements ValidationContext { private final Map expressionLanguageSupported; private final String annotationData; private final Set serviceIdentifiersToNotValidate; + private final String groupId; - public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map properties, final String annotationData) { - this(controllerServiceProvider, Collections.emptySet(), properties, annotationData); + public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map properties, final String annotationData, final String groupId) { + this(controllerServiceProvider, Collections. emptySet(), properties, annotationData, groupId); } public StandardValidationContext( final ControllerServiceProvider controllerServiceProvider, final Set serviceIdentifiersToNotValidate, final Map properties, - final String annotationData) { + final String annotationData, + final String groupId) { this.controllerServiceProvider = controllerServiceProvider; this.properties = new HashMap<>(properties); this.annotationData = annotationData; this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate; + this.groupId = groupId; preparedQueries = new HashMap<>(properties.size()); for (final Map.Entry entry : properties.entrySet()) { @@ -90,7 +93,7 @@ public class StandardValidationContext implements ValidationContext { @Override public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) { final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(controllerService.getIdentifier()); - return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData()); + return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), serviceNode.getProcessGroup().getIdentifier()); } @Override @@ -134,4 +137,9 @@ public class StandardValidationContext implements ValidationContext { final Boolean supported = expressionLanguageSupported.get(propertyName); return Boolean.TRUE.equals(supported); } + + @Override + public String getProcessGroupIdentifier() { + return groupId; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java index c3df987032..9c8475e7d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java @@ -33,12 +33,13 @@ public class StandardValidationContextFactory implements ValidationContextFactor } @Override - public ValidationContext newValidationContext(final Map properties, final String annotationData) { - return new StandardValidationContext(serviceProvider, properties, annotationData); + public ValidationContext newValidationContext(final Map properties, final String annotationData, final String groupId) { + return new StandardValidationContext(serviceProvider, properties, annotationData, groupId); } @Override - public ValidationContext newValidationContext(final Set serviceIdentifiersToNotValidate, final Map properties, final String annotationData) { - return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData); + public ValidationContext newValidationContext(final Set serviceIdentifiersToNotValidate, + final Map properties, final String annotationData, final String groupId) { + return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java index 50cb789faa..08ea7da260 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java @@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.FlowSerializationException; -import org.apache.nifi.controller.FlowSynchronizationException; import org.apache.nifi.controller.UninheritableFlowException; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.lifecycle.LifeCycle; /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 56f08a692e..0f34553c48 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -29,6 +29,8 @@ + @@ -139,6 +141,7 @@ + @@ -162,6 +165,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 6e1b6c881c..15efd867b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -28,6 +28,9 @@ import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.admin.service.KeyService; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.serialization.FlowSerializationException; +import org.apache.nifi.controller.serialization.FlowSerializer; +import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 2cde97bb33..0d9d20ea3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -290,7 +290,7 @@ public class TestProcessorLifecycle { } }); } - assertTrue(countDownCounter.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(countDownCounter.await(1000000, TimeUnit.MILLISECONDS)); String previousOperation = null; for (String operationName : testProcessor.operationNames) { if (previousOperation == null || previousOperation.equals("@OnStopped")) { @@ -559,8 +559,10 @@ public class TestProcessorLifecycle { this.setControllerRootGroup(fc, testGroup); ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo", true); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + testGroup.addControllerService(testServiceNode); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + testGroup.addProcessor(testProcNode); testProcNode.setProperty("P", "hello"); testProcNode.setProperty("S", testServiceNode.getIdentifier()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 8beafdb729..e59f512d4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -50,6 +50,8 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceNode; import org.apache.nifi.controller.service.StandardControllerServiceProvider; +import org.apache.nifi.controller.service.mock.MockProcessGroup; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -116,11 +118,17 @@ public class TestStandardProcessScheduler { @Test(timeout = 6000) public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException { final Processor proc = new ServiceReferencingProcessor(); + final ProcessGroup group = new MockProcessGroup(); + + final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class)); + serviceProvider.setRootProcessGroup(group); - final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class)); final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); + group.addControllerService(service); + final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(), new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); + group.addProcessor(procNode); procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index f7a338662b..5c83b97ee3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -35,6 +35,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.mock.DummyProcessor; +import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.groups.ProcessGroup; @@ -88,13 +89,17 @@ public class TestStandardControllerServiceProvider { provider.disableControllerService(serviceNode); } - @Test(timeout=10000) + @Test(timeout = 1000000) public void testEnableDisableWithReference() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); + final ProcessGroup group = new MockProcessGroup(); + provider.setRootProcessGroup(group); final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false); + group.addControllerService(serviceNodeA); + group.addControllerService(serviceNodeB); serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B"); @@ -145,6 +150,8 @@ public class TestStandardControllerServiceProvider { public void testEnableReferencingServicesGraph(ProcessScheduler scheduler) { final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); + final ProcessGroup procGroup = new MockProcessGroup(); + provider.setRootProcessGroup(procGroup); // build a graph of controller services with dependencies as such: // @@ -163,6 +170,11 @@ public class TestStandardControllerServiceProvider { final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + procGroup.addControllerService(serviceNode1); + procGroup.addControllerService(serviceNode2); + procGroup.addControllerService(serviceNode3); + procGroup.addControllerService(serviceNode4); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java new file mode 100644 index 0000000000..98e2571819 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.service.mock; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.Snippet; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.ProcessGroupCounts; +import org.apache.nifi.groups.RemoteProcessGroup; + +public class MockProcessGroup implements ProcessGroup { + private Map serviceMap = new HashMap<>(); + + @Override + public Authorizable getParentAuthorizable() { + return null; + } + + @Override + public Resource getResource() { + return null; + } + + @Override + public ProcessGroup getParent() { + return null; + } + + @Override + public void setParent(ProcessGroup group) { + + } + + @Override + public String getIdentifier() { + return "unit test group id"; + } + + @Override + public String getName() { + return "unit test group"; + } + + @Override + public void setName(String name) { + + } + + @Override + public void setPosition(Position position) { + + } + + @Override + public Position getPosition() { + return null; + } + + @Override + public String getComments() { + return null; + } + + @Override + public void setComments(String comments) { + + } + + @Override + public ProcessGroupCounts getCounts() { + return null; + } + + @Override + public void startProcessing() { + + } + + @Override + public void stopProcessing() { + + } + + @Override + public void enableProcessor(ProcessorNode processor) { + + } + + @Override + public void enableInputPort(Port port) { + + } + + @Override + public void enableOutputPort(Port port) { + + } + + @Override + public void startProcessor(ProcessorNode processor) { + + } + + @Override + public void startInputPort(Port port) { + + } + + @Override + public void startOutputPort(Port port) { + + } + + @Override + public void startFunnel(Funnel funnel) { + + } + + @Override + public void stopProcessor(ProcessorNode processor) { + + } + + @Override + public void stopInputPort(Port port) { + + } + + @Override + public void stopOutputPort(Port port) { + + } + + @Override + public void disableProcessor(ProcessorNode processor) { + + } + + @Override + public void disableInputPort(Port port) { + + } + + @Override + public void disableOutputPort(Port port) { + + } + + @Override + public void shutdown() { + + } + + @Override + public boolean isRootGroup() { + return false; + } + + @Override + public void addInputPort(Port port) { + + } + + @Override + public void removeInputPort(Port port) { + + } + + @Override + public Set getInputPorts() { + return null; + } + + @Override + public Port getInputPort(String id) { + return null; + } + + @Override + public void addOutputPort(Port port) { + + } + + @Override + public void removeOutputPort(Port port) { + + } + + @Override + public Port getOutputPort(String id) { + return null; + } + + @Override + public Set getOutputPorts() { + return null; + } + + @Override + public void addProcessGroup(ProcessGroup group) { + + } + + @Override + public ProcessGroup getProcessGroup(String id) { + return null; + } + + @Override + public Set getProcessGroups() { + return null; + } + + @Override + public void removeProcessGroup(ProcessGroup group) { + + } + + @Override + public void addProcessor(ProcessorNode processor) { + processor.setProcessGroup(this); + } + + @Override + public void removeProcessor(ProcessorNode processor) { + + } + + @Override + public Set getProcessors() { + return null; + } + + @Override + public ProcessorNode getProcessor(String id) { + return null; + } + + @Override + public Connectable getConnectable(String id) { + return null; + } + + @Override + public void addConnection(Connection connection) { + + } + + @Override + public void removeConnection(Connection connection) { + + } + + @Override + public void inheritConnection(Connection connection) { + + } + + @Override + public Connection getConnection(String id) { + return null; + } + + @Override + public Set getConnections() { + return null; + } + + @Override + public Connection findConnection(String id) { + return null; + } + + @Override + public List findAllConnections() { + return null; + } + + @Override + public Funnel findFunnel(String id) { + return null; + } + + @Override + public ControllerServiceNode findControllerService(String id) { + return serviceMap.get(id); + } + + @Override + public Set findAllControllerServices() { + return new HashSet<>(serviceMap.values()); + } + + @Override + public void addRemoteProcessGroup(RemoteProcessGroup remoteGroup) { + + } + + @Override + public void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup) { + + } + + @Override + public RemoteProcessGroup getRemoteProcessGroup(String id) { + return null; + } + + @Override + public Set getRemoteProcessGroups() { + return null; + } + + @Override + public void addLabel(Label label) { + + } + + @Override + public void removeLabel(Label label) { + + } + + @Override + public Set