From c955ec1689da0bfec9531a6698acc67780a55788 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 17 Jun 2016 09:08:44 -0400 Subject: [PATCH] NIFI-2033: Allow Controller Services to be scoped at Controller level instead of just group level. This closes #540 --- .../nifi/components/PropertyDescriptor.java | 6 +- .../controller/ControllerServiceLookup.java | 3 +- .../nifi/web/NiFiWebConfigurationContext.java | 3 +- .../util/MockControllerServiceLookup.java | 6 +- .../MockProcessorInitializationContext.java | 12 +- .../nifi/util/MockValidationContext.java | 6 +- .../mock/MockControllerServiceLookup.java | 14 +- .../AbstractConfiguredComponent.java | 4 +- .../controller/ValidationContextFactory.java | 5 +- .../service/ControllerServiceProvider.java | 22 + .../nifi/controller/FlowController.java | 73 +++- .../controller/StandardFlowSynchronizer.java | 4 +- .../controller/StandardProcessorNode.java | 28 +- .../reporting/StandardReportingContext.java | 4 +- ...tandardReportingInitializationContext.java | 4 +- .../serialization/StandardFlowSerializer.java | 6 +- .../service/ControllerServiceLoader.java | 15 +- ...ontrollerServiceInitializationContext.java | 6 +- .../StandardControllerServiceProvider.java | 110 +++-- .../manager/StandardStateManagerProvider.java | 2 +- ...ponentSpecificControllerServiceLookup.java | 66 +++ .../processor/StandardProcessContext.java | 10 +- .../processor/StandardValidationContext.java | 17 +- .../StandardValidationContextFactory.java | 8 +- .../TestStandardProcessScheduler.java | 81 ++-- ...StandardControllerServiceProviderTest.java | 2 +- ...TestStandardControllerServiceProvider.java | 41 +- .../service/mock/MockProcessGroup.java | 169 ++++---- .../processor/TestStandardPropertyValue.java | 10 +- .../nifi/web/StandardNiFiServiceFacade.java | 404 +++++++++--------- .../StandardNiFiWebConfigurationContext.java | 114 +++-- .../apache/nifi/web/api/dto/DtoFactory.java | 51 +-- .../nifi/web/dao/ControllerServiceDAO.java | 5 +- .../impl/StandardControllerServiceDAO.java | 66 ++- .../main/resources/nifi-web-api-context.xml | 3 +- 35 files changed, 826 insertions(+), 554 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/ComponentSpecificControllerServiceLookup.java diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index a8b9bdf67a..7b382f6eb4 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -138,7 +138,7 @@ public final class PropertyDescriptor implements Comparable // if the property descriptor identifies a Controller Service, validate that the ControllerService exists, is of the correct type, and is valid if (controllerServiceDefinition != null) { - final Set validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition, context.getProcessGroupIdentifier()); + final Set validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition); if (validIdentifiers != null && validIdentifiers.contains(input)) { final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input); if (!context.isValidationRequired(controllerService)) { @@ -213,7 +213,7 @@ public final class PropertyDescriptor implements Comparable * for ENABLED state even though by the time this method returns the * dependent service's state could be fully ENABLED. */ - private boolean isDependentServiceEnableable(ValidationContext context, String serviceId) { + private boolean isDependentServiceEnableable(final ValidationContext context, final String serviceId) { boolean enableable = context.getControllerServiceLookup().isControllerServiceEnabling(serviceId); if (!enableable) { enableable = context.getControllerServiceLookup().isControllerServiceEnabled(serviceId); @@ -516,7 +516,7 @@ public final class PropertyDescriptor implements Comparable return true; } - PropertyDescriptor desc = (PropertyDescriptor) other; + final PropertyDescriptor desc = (PropertyDescriptor) other; return this.name.equals(desc.name); } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java index 3345a9293c..03f6e9b29e 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java @@ -56,13 +56,12 @@ public interface ControllerServiceLookup { /** * * @param serviceType type of service to get identifiers for - * @param groupId the ID of the Process Group to look in for Controller Services * * @return the set of all Controller Service Identifiers whose Controller * Service is of the given type. * @throws IllegalArgumentException if the given class is not an interface */ - Set getControllerServiceIdentifiers(Class serviceType, String groupId) throws IllegalArgumentException; + Set getControllerServiceIdentifiers(Class serviceType) throws IllegalArgumentException; /** * @param serviceIdentifier identifier to look up diff --git a/nifi-api/src/main/java/org/apache/nifi/web/NiFiWebConfigurationContext.java b/nifi-api/src/main/java/org/apache/nifi/web/NiFiWebConfigurationContext.java index 39bea4ffdc..e2689afc53 100644 --- a/nifi-api/src/main/java/org/apache/nifi/web/NiFiWebConfigurationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/web/NiFiWebConfigurationContext.java @@ -28,12 +28,13 @@ public interface NiFiWebConfigurationContext { /** * @param serviceIdentifier of the controller service + * @param componentId the id of the component that is referencing the controller service * @return the ControllerService for the specified identifier. If a * corresponding service cannot be found, null is returned. If this NiFi is * clustered, the only services available will be those those availability * is NCM only */ - ControllerService getControllerService(String serviceIdentifier); + ControllerService getControllerService(String serviceIdentifier, String componentId); /** * Provides a mechanism for custom UIs to save actions to appear in NiFi diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index d6ff5c85d4..e07540b70a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -38,7 +38,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo return addControllerService(service, service.getIdentifier()); } - public void removeControllerService(ControllerService service) { + public void removeControllerService(final ControllerService service) { final ControllerService canonical = getControllerService(service.getIdentifier()); if (canonical == null || canonical != service) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); @@ -82,7 +82,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + public Set getControllerServiceIdentifiers(final Class serviceType) { final Set ids = new HashSet<>(); for (final Map.Entry entry : controllerServiceMap.entrySet()) { if (serviceType.isAssignableFrom(entry.getValue().getService().getClass())) { @@ -93,7 +93,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo } @Override - public String getControllerServiceName(String serviceIdentifier) { + public String getControllerServiceName(final String serviceIdentifier) { final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier); return status == null ? null : serviceIdentifier; } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index f0961efc87..82295f9023 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -47,8 +47,8 @@ public class MockProcessorInitializationContext implements ProcessorInitializati } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { - return context.getControllerServiceIdentifiers(serviceType, groupId); + public Set getControllerServiceIdentifiers(final Class serviceType) { + return context.getControllerServiceIdentifiers(serviceType); } @Override @@ -62,22 +62,22 @@ public class MockProcessorInitializationContext implements ProcessorInitializati } @Override - public String getControllerServiceName(String serviceIdentifier) { + public String getControllerServiceName(final String serviceIdentifier) { return context.getControllerServiceName(serviceIdentifier); } @Override - public boolean isControllerServiceEnabled(String serviceIdentifier) { + public boolean isControllerServiceEnabled(final String serviceIdentifier) { return context.isControllerServiceEnabled(serviceIdentifier); } @Override - public boolean isControllerServiceEnabled(ControllerService service) { + public boolean isControllerServiceEnabled(final ControllerService service) { return context.isControllerServiceEnabled(service); } @Override - public boolean isControllerServiceEnabling(String serviceIdentifier) { + public boolean isControllerServiceEnabling(final String serviceIdentifier) { return context.isControllerServiceEnabling(serviceIdentifier); } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index 84d32778dc..f0ff58b435 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -86,8 +86,8 @@ public class MockValidationContext implements ValidationContext, ControllerServi } @Override - public Set getControllerServiceIdentifiers(Class serviceType, String groupId) { - return context.getControllerServiceIdentifiers(serviceType, groupId); + public Set getControllerServiceIdentifiers(final Class serviceType) { + return context.getControllerServiceIdentifiers(serviceType); } @Override @@ -117,7 +117,7 @@ public class MockValidationContext implements ValidationContext, ControllerServi } @Override - public boolean isControllerServiceEnabling(String serviceIdentifier) { + public boolean isControllerServiceEnabling(final String serviceIdentifier) { return context.isControllerServiceEnabling(serviceIdentifier); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java index 26a6f39934..e92e8012d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java @@ -32,34 +32,32 @@ import org.apache.nifi.controller.ControllerServiceLookup; public class MockControllerServiceLookup implements ControllerServiceLookup { @Override - public ControllerService getControllerService(String serviceIdentifier) { + public ControllerService getControllerService(final String serviceIdentifier) { return null; } @Override - public boolean isControllerServiceEnabled(String serviceIdentifier) { + public boolean isControllerServiceEnabled(final String serviceIdentifier) { return false; } @Override - public boolean isControllerServiceEnabled(ControllerService service) { + public boolean isControllerServiceEnabled(final ControllerService service) { return false; } @Override - public Set getControllerServiceIdentifiers(Class serviceType, String groupId) - throws IllegalArgumentException { + public Set getControllerServiceIdentifiers(final Class serviceType) throws IllegalArgumentException { return Collections.emptySet(); } @Override - public boolean isControllerServiceEnabling(String serviceIdentifier) { + public boolean isControllerServiceEnabling(final String serviceIdentifier) { return false; } @Override - public String getControllerServiceName(String serviceIdentifier) { + public String getControllerServiceName(final String serviceIdentifier) { return serviceIdentifier; } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index b3de995a85..0454a2f1b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -262,7 +262,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public boolean isValid() { final Collection validationResults = validate(validationContextFactory.newValidationContext( - getProperties(), getAnnotationData(), getProcessGroupIdentifier())); + getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier())); for (final ValidationResult result : validationResults) { if (!result.isValid()) { @@ -283,7 +283,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone lock.lock(); try { final ValidationContext validationContext = validationContextFactory.newValidationContext( - serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier()); + serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java index be6346f5cf..1f17d39d3e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java @@ -24,8 +24,9 @@ import org.apache.nifi.components.ValidationContext; public interface ValidationContextFactory { - ValidationContext newValidationContext(Map properties, String annotationData, String groupId); + ValidationContext newValidationContext(Map properties, String annotationData, String groupId, String componentId); - ValidationContext newValidationContext(Set serviceIdentifiersToNotValidate, Map properties, String annotationData, String groupId); + ValidationContext newValidationContext(Set serviceIdentifiersToNotValidate, Map properties, + String annotationData, String groupId, String componentId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 4db6bc9921..51d54a099e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; /** @@ -176,4 +177,25 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * @param serviceNode the node */ Set scheduleReferencingComponents(ControllerServiceNode serviceNode); + + /** + * + * @param serviceType type of service to get identifiers for + * @param groupId the ID of the Process Group to look in for Controller Services + * + * @return the set of all Controller Service Identifiers whose Controller + * Service is of the given type. + * @throws IllegalArgumentException if the given class is not an interface + */ + Set getControllerServiceIdentifiers(Class serviceType, String groupId) throws IllegalArgumentException; + + /** + * @param serviceIdentifier the identifier of the controller service + * @param componentId the identifier of the component that is referencing the service. + * @return the Controller Service that is registered with the given identifier or null if that + * identifier does not exist for any controller service or if the controller service with that identifier is + * not accessible from the component with the given componentId, or if no component exists with the given + * identifier + */ + ControllerService getControllerServiceForComponent(String serviceIdentifier, String componentId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index f4e70af44c..cf2cba68f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -278,6 +278,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final StateManagerProvider stateManagerProvider; private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started private final ConcurrentMap reportingTasks = new ConcurrentHashMap<>(); + private final ConcurrentMap rootControllerServices = new ConcurrentHashMap<>(); private volatile ZooKeeperStateServer zooKeeperStateServer; @@ -501,8 +502,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); instanceId = UUID.randomUUID().toString(); - controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); - controllerServiceProvider.setRootProcessGroup(rootGroup); + controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider); if (remoteInputSocketPort == null) { LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); @@ -521,7 +521,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R externalSiteListeners.add(HttpRemoteSiteListener.getInstance()); } - for(RemoteSiteListener listener : externalSiteListeners) { + for(final RemoteSiteListener listener : externalSiteListeners) { listener.setRootGroup(rootGroup); } @@ -659,7 +659,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // ContentRepository to purge superfluous files contentRepository.cleanup(); - for(RemoteSiteListener listener : externalSiteListeners) { + for(final RemoteSiteListener listener : externalSiteListeners) { listener.start(); } @@ -1297,7 +1297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R + "will take an indeterminate amount of time to stop. Might need to kill the program manually."); } - for(RemoteSiteListener listener : externalSiteListeners) { + for(final RemoteSiteListener listener : externalSiteListeners) { listener.stop(); } @@ -1442,12 +1442,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { rootGroup = group; - for(RemoteSiteListener listener : externalSiteListeners) { + for(final RemoteSiteListener listener : externalSiteListeners) { listener.setRootGroup(rootGroup); } - controllerServiceProvider.setRootProcessGroup(rootGroup); - // update the heartbeat bean this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus)); } finally { @@ -2874,11 +2872,68 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return controllerServiceProvider.getControllerService(serviceIdentifier); } + @Override + public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) { + return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, componentId); + } + + @Override + public Set getControllerServiceIdentifiers(final Class serviceType) throws IllegalArgumentException { + return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); + } + @Override public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { return controllerServiceProvider.getControllerServiceNode(serviceIdentifier); } + public Set getRootControllerServices() { + return new HashSet<>(rootControllerServices.values()); + } + + public void addRootControllerService(final ControllerServiceNode serviceNode) { + final ControllerServiceNode existing = rootControllerServices.putIfAbsent(serviceNode.getIdentifier(), serviceNode); + if (existing != null) { + throw new IllegalStateException("Controller Service with ID " + serviceNode.getIdentifier() + " already exists at the Controller level"); + } + } + + public ControllerServiceNode getRootControllerService(final String serviceIdentifier) { + return rootControllerServices.get(serviceIdentifier); + } + + public void removeRootControllerService(final ControllerServiceNode service) { + final ControllerServiceNode existing = rootControllerServices.get(requireNonNull(service).getIdentifier()); + if (existing == null) { + throw new IllegalStateException(service + " is not a member of this Process Group"); + } + + service.verifyCanDelete(); + + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); + } + + for (final Map.Entry entry : service.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.getControllerServiceDefinition() != null) { + final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); + if (value != null) { + final ControllerServiceNode referencedNode = getRootControllerService(value); + if (referencedNode != null) { + referencedNode.removeReference(service); + } + } + } + } + + rootControllerServices.remove(service.getIdentifier()); + getStateManagerProvider().onComponentRemoved(service.getIdentifier()); + + LOG.info("{} removed from Flow Controller", service, this); + } + @Override public boolean isControllerServiceEnabled(final ControllerService service) { return controllerServiceProvider.isControllerServiceEnabled(service); @@ -3844,7 +3899,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + public Set getControllerServiceIdentifiers(final Class serviceType, final String groupId) { return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 6b938d6a8d..572fd79706 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -314,7 +314,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - void scaleRootGroup(ProcessGroup rootGroup, FlowEncodingVersion encodingVersion) { + void scaleRootGroup(final ProcessGroup rootGroup, final FlowEncodingVersion encodingVersion) { if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) { // Calculate new Positions if the encoding version of the flow is older than 1.0. PositionScaler.scale(rootGroup, 1.5, 1.34); @@ -939,7 +939,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { remoteGroup.setYieldDuration(remoteGroupDto.getYieldDuration()); } - String transportProtocol = remoteGroupDto.getTransportProtocol(); + final String transportProtocol = remoteGroupDto.getTransportProtocol(); if (transportProtocol != null && !transportProtocol.trim().isEmpty()) { remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase())); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 1f10e6a3ab..3ea2b6bce8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -233,7 +233,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setPosition(Position position) { + public void setPosition(final Position position) { this.position.set(position); } @@ -373,7 +373,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @SuppressWarnings("deprecation") public String getProcessorDescription() { - CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); + final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); String description = null; if (capDesc != null) { description = capDesc.value(); @@ -644,7 +644,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Set getConnections(final Relationship relationship) { - Set applicableConnections = connections.get(relationship); + final Set applicableConnections = connections.get(relationship); return (applicableConnections == null) ? Collections. emptySet() : Collections.unmodifiableSet(applicableConnections); } @@ -923,7 +923,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public boolean isValid() { try { final ValidationContext validationContext = this.getValidationContextFactory() - .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier()); + .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -970,7 +970,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final List results = new ArrayList<>(); try { final ValidationContext validationContext = this.getValidationContextFactory() - .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier()); + .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier()); final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -1286,7 +1286,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable }; taskScheduler.execute(startProcRunnable); } else { - String procName = this.processor.getClass().getSimpleName(); + final String procName = this.processor.getClass().getSimpleName(); LOG.warn("Can not start '" + procName + "' since it's already in the process of being started or it is DISABLED - " + scheduledState.get()); @@ -1345,7 +1345,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } else { scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); } - } catch (Exception e) { + } catch (final Exception e) { LOG.warn("Failed while shutting down processor " + processor, e); } } @@ -1389,19 +1389,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * be logged (WARN) informing a user so further actions could be taken. *

*/ - private void invokeTaskAsCancelableFuture(SchedulingAgentCallback callback, Callable task) { - String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); - long onScheduleTimeout = timeoutString == null ? 60000 + private void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable task) { + final String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); + final long onScheduleTimeout = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); - Future taskFuture = callback.invokeMonitoringTask(task); + final Future taskFuture = callback.invokeMonitoringTask(task); try { taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() + "' lifecycle OnScheduled operation to finish."); Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e); - } catch (TimeoutException e) { + } catch (final TimeoutException e) { taskFuture.cancel(true); LOG.warn("Timed out while waiting for OnScheduled of '" + this.processor.getClass().getSimpleName() @@ -1411,7 +1411,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + this.processor + "' that needs to be documented, reported and eventually fixed."); throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e); - } catch (ExecutionException e){ + } catch (final ExecutionException e){ throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); } finally { callback.postMonitor(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index ff767efdbd..b174c4c57f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -115,8 +115,8 @@ public class StandardReportingContext implements ReportingContext, ControllerSer } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { - return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); + public Set getControllerServiceIdentifiers(final Class serviceType) { + return serviceProvider.getControllerServiceIdentifiers(serviceType, null); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index 8b7b3bfa5c..caf38a8a7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -75,8 +75,8 @@ public class StandardReportingInitializationContext implements ReportingInitiali } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { - return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); + public Set getControllerServiceIdentifiers(final Class serviceType) { + return serviceProvider.getControllerServiceIdentifiers(serviceType, null); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index e77463767c..0892f41436 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -94,8 +94,12 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup"); + // Add root-level controller services final Element controllerServicesNode = doc.createElement("controllerServices"); rootNode.appendChild(controllerServicesNode); + for (final ControllerServiceNode serviceNode : controller.getRootControllerServices()) { + addControllerService(controllerServicesNode, serviceNode); + } final Element reportingTasksNode = doc.createElement("reportingTasks"); rootNode.appendChild(reportingTasksNode); @@ -252,7 +256,7 @@ public class StandardFlowSerializer implements FlowSerializer { } addTextElement(element, "proxyUser", remoteRef.getProxyUser()); if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) { - String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX; + final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX; addTextElement(element, "proxyPassword", value); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 74533ffabf..d302cff7fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -31,6 +31,7 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.ProcessGroup; @@ -48,7 +49,7 @@ public class ControllerServiceLoader { private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); - public static List loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final ProcessGroup parentGroup, + public static List loadControllerServices(final FlowController controller, final InputStream serializedStream, final ProcessGroup parentGroup, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException { final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); @@ -90,19 +91,21 @@ public class ControllerServiceLoader { final Document document = builder.parse(in); final Element controllerServices = document.getDocumentElement(); final List serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); - return new ArrayList<>(loadControllerServices(serviceElements, provider, parentGroup, encryptor, bulletinRepo, autoResumeState)); + return new ArrayList<>(loadControllerServices(serviceElements, controller, parentGroup, encryptor, bulletinRepo, autoResumeState)); } catch (SAXException | ParserConfigurationException sxe) { throw new IOException(sxe); } } - public static Collection loadControllerServices(final List serviceElements, final ControllerServiceProvider provider, final ProcessGroup parentGroup, + public static Collection loadControllerServices(final List serviceElements, final FlowController controller, final ProcessGroup parentGroup, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { final Map nodeMap = new HashMap<>(); for (final Element serviceElement : serviceElements) { - final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor); - if (parentGroup != null) { + final ControllerServiceNode serviceNode = createControllerService(controller, serviceElement, encryptor); + if (parentGroup == null) { + controller.addRootControllerService(serviceNode); + } else { parentGroup.addControllerService(serviceNode); } @@ -132,7 +135,7 @@ public class ControllerServiceLoader { } } - provider.enableControllerServices(nodesToEnable); + controller.enableControllerServices(nodesToEnable); } return nodeMap.keySet(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java index fc6d376482..71cd793fcc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java @@ -49,8 +49,8 @@ public class StandardControllerServiceInitializationContext implements Controlle } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { - return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); + public Set getControllerServiceIdentifiers(final Class serviceType) { + return serviceProvider.getControllerServiceIdentifiers(serviceType); } @Override @@ -69,7 +69,7 @@ public class StandardControllerServiceInitializationContext implements Controlle } @Override - public boolean isControllerServiceEnabling(String serviceIdentifier) { + public boolean isControllerServiceEnabling(final String serviceIdentifier) { return serviceProvider.isControllerServiceEnabling(serviceIdentifier); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 425ee4012f..4e3249fd03 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -72,7 +72,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private static final Set validDisabledMethods; private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; - private volatile ProcessGroup rootGroup; + private final FlowController flowController; static { // methods that are okay to be called when the service is disabled. @@ -86,17 +86,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi validDisabledMethods = Collections.unmodifiableSet(validMethods); } - public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider) { - // the following 2 maps must be updated atomically, but we do not lock around them because they are modified - // only in the createControllerService method, and both are modified before the method returns + public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, + final StateManagerProvider stateManagerProvider) { + + this.flowController = flowController; this.processScheduler = scheduler; this.bulletinRepo = bulletinRepo; this.stateManagerProvider = stateManagerProvider; } - public void setRootProcessGroup(ProcessGroup rootGroup) { - this.rootGroup = rootGroup; - } private Class[] getInterfaces(final Class cls) { final List> allIfcs = new ArrayList<>(); @@ -519,6 +517,52 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return node == null ? null : node.getProxiedControllerService(); } + private ProcessGroup getRootGroup() { + return flowController.getGroup(flowController.getRootGroupId()); + } + + @Override + public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) { + final ProcessGroup rootGroup = getRootGroup(); + + // Find the Process Group that owns the component. + ProcessGroup groupOfInterest = null; + + final ProcessorNode procNode = rootGroup.findProcessor(componentId); + if (procNode == null) { + final ControllerServiceNode serviceNode = getControllerServiceNode(componentId); + if (serviceNode == null) { + final ReportingTaskNode taskNode = flowController.getReportingTaskNode(componentId); + if (taskNode == null) { + throw new IllegalStateException("Could not find any Processor, Reporting Task, or Controller Service with identifier " + componentId); + } + + // we have confirmed that the component is a reporting task. We can only reference Controller Services + // that are scoped at the FlowController level in this case. + final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier); + return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService(); + } else { + groupOfInterest = serviceNode.getProcessGroup(); + } + } else { + groupOfInterest = procNode.getProcessGroup(); + } + + if (groupOfInterest == null) { + final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier); + return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService(); + } + + final Set servicesForGroup = groupOfInterest.getControllerServices(true); + for (final ControllerServiceNode serviceNode : servicesForGroup) { + if (serviceIdentifier.equals(serviceNode.getIdentifier())) { + return serviceNode.getProxiedControllerService(); + } + } + + return null; + } + @Override public boolean isControllerServiceEnabled(final ControllerService service) { return isControllerServiceEnabled(service.getIdentifier()); @@ -538,27 +582,33 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { - final ProcessGroup group = rootGroup; - return group == null ? null : group.findControllerService(serviceIdentifier); + final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier); + if (rootServiceNode != null) { + return rootServiceNode; + } + + return getRootGroup().findControllerService(serviceIdentifier); } + @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { - ProcessGroup group = rootGroup; - if (group == null) { - return Collections.emptySet(); - } + public Set getControllerServiceIdentifiers(final Class serviceType, final String groupId) { + final Set serviceNodes; + if (groupId == null) { + serviceNodes = flowController.getRootControllerServices(); + } else { + ProcessGroup group = getRootGroup(); + if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) { + group = group.findProcessGroup(groupId); + } - if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) { - group = group.findProcessGroup(groupId); - } + if (group == null) { + return Collections.emptySet(); + } - if (group == null) { - return Collections.emptySet(); + serviceNodes = group.getControllerServices(true); } - final Set serviceNodes = group.getControllerServices(true); - final Set identifiers = new HashSet<>(); for (final ControllerServiceNode serviceNode : serviceNodes) { if (requireNonNull(serviceType).isAssignableFrom(serviceNode.getProxiedControllerService().getClass())) { @@ -579,7 +629,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public void removeControllerService(final ControllerServiceNode serviceNode) { final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup(); if (group == null) { - throw new IllegalArgumentException("Cannot remote Controller Service " + serviceNode + " because it does not belong to any Process Group"); + flowController.removeRootControllerService(serviceNode); + return; } group.removeControllerService(serviceNode); @@ -587,12 +638,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public Set getAllControllerServices() { - final ProcessGroup group = rootGroup; - if (group == null) { - return Collections.emptySet(); - } + final Set allServices = new HashSet<>(); + allServices.addAll(flowController.getRootControllerServices()); + allServices.addAll(getRootGroup().findAllControllerServices()); - return group.findAllControllerServices(); + return allServices; } /** @@ -709,4 +759,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) { // we can always stop referencing components } + + + @Override + public Set getControllerServiceIdentifiers(final Class serviceType) throws IllegalArgumentException { + throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier"); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index a2a9104726..8f20aab272 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -186,7 +186,7 @@ public class StandardStateManagerProvider implements StateManagerProvider { provider.initialize(initContext); } - final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null); + final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null, null); final Collection results = provider.validate(validationContext); final StringBuilder validationFailures = new StringBuilder(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/ComponentSpecificControllerServiceLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/ComponentSpecificControllerServiceLookup.java new file mode 100644 index 0000000000..a08d118571 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/ComponentSpecificControllerServiceLookup.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processor; + +import java.util.Set; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.service.ControllerServiceProvider; + +public class ComponentSpecificControllerServiceLookup implements ControllerServiceLookup { + private final ControllerServiceProvider serviceProvider; + private final String componentId; + private final String groupId; + + public ComponentSpecificControllerServiceLookup(final ControllerServiceProvider serviceProvider, final String componentId, final String groupId) { + this.serviceProvider = serviceProvider; + this.componentId = componentId; + this.groupId = groupId; + } + + @Override + public ControllerService getControllerService(final String serviceIdentifier) { + return serviceProvider.getControllerServiceForComponent(serviceIdentifier, componentId); + } + + @Override + public boolean isControllerServiceEnabled(final String serviceIdentifier) { + return serviceProvider.isControllerServiceEnabled(serviceIdentifier); + } + + @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return serviceProvider.isControllerServiceEnabling(serviceIdentifier); + } + + @Override + public boolean isControllerServiceEnabled(final ControllerService service) { + return serviceProvider.isControllerServiceEnabled(service); + } + + @Override + public Set getControllerServiceIdentifiers(final Class serviceType) throws IllegalArgumentException { + return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); + } + + @Override + public String getControllerServiceName(final String serviceIdentifier) { + return serviceProvider.getControllerServiceName(serviceIdentifier); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index d83e08cfed..3c5acbbdbd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -101,7 +101,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService @Override public ControllerService getControllerService(final String serviceIdentifier) { - return controllerServiceProvider.getControllerService(serviceIdentifier); + return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, procNode.getIdentifier()); } @Override @@ -130,11 +130,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + public Set getControllerServiceIdentifiers(final Class serviceType) { if (!serviceType.isInterface()) { throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface"); } - return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); + return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, procNode.getProcessGroup().getIdentifier()); } @Override @@ -197,8 +197,8 @@ public class StandardProcessContext implements ProcessContext, ControllerService } @Override - public boolean hasConnection(Relationship relationship) { - Set connections = procNode.getConnections(relationship); + public boolean hasConnection(final Relationship relationship) { + final Set connections = procNode.getConnections(relationship); return connections != null && !connections.isEmpty(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index 1aec203e57..3fb8a21f6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -45,22 +45,26 @@ public class StandardValidationContext implements ValidationContext { private final String annotationData; private final Set serviceIdentifiersToNotValidate; private final String groupId; + private final String componentId; - public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map properties, final String annotationData, final String groupId) { - this(controllerServiceProvider, Collections. emptySet(), properties, annotationData, groupId); + public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map properties, + final String annotationData, final String groupId, final String componentId) { + this(controllerServiceProvider, Collections. emptySet(), properties, annotationData, groupId, componentId); } public StandardValidationContext( final ControllerServiceProvider controllerServiceProvider, final Set serviceIdentifiersToNotValidate, final Map properties, - final String annotationData, - final String groupId) { + final String annotationData, + final String groupId, + final String componentId) { this.controllerServiceProvider = controllerServiceProvider; this.properties = new HashMap<>(properties); this.annotationData = annotationData; this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate; this.groupId = groupId; + this.componentId = componentId; preparedQueries = new HashMap<>(properties.size()); for (final Map.Entry entry : properties.entrySet()) { @@ -93,7 +97,8 @@ public class StandardValidationContext implements ValidationContext { @Override public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) { final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(controllerService.getIdentifier()); - return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), serviceNode.getProcessGroup().getIdentifier()); + return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), + serviceNode.getProcessGroup().getIdentifier(), serviceNode.getIdentifier()); } @Override @@ -114,7 +119,7 @@ public class StandardValidationContext implements ValidationContext { @Override public ControllerServiceLookup getControllerServiceLookup() { - return controllerServiceProvider; + return new ComponentSpecificControllerServiceLookup(controllerServiceProvider, componentId, groupId); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java index 9c8475e7d5..1c52e173c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java @@ -33,13 +33,13 @@ public class StandardValidationContextFactory implements ValidationContextFactor } @Override - public ValidationContext newValidationContext(final Map properties, final String annotationData, final String groupId) { - return new StandardValidationContext(serviceProvider, properties, annotationData, groupId); + public ValidationContext newValidationContext(final Map properties, final String annotationData, final String groupId, final String componentId) { + return new StandardValidationContext(serviceProvider, properties, annotationData, groupId, componentId); } @Override public ValidationContext newValidationContext(final Set serviceIdentifiersToNotValidate, - final Map properties, final String annotationData, final String groupId) { - return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId); + final Map properties, final String annotationData, final String groupId, String componentId) { + return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId, componentId); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index e59f512d4d..7141ee3bb3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -38,11 +38,13 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.cluster.Heartbeater; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.reporting.StandardReportingTaskNode; import org.apache.nifi.controller.service.ControllerServiceNode; @@ -73,7 +75,9 @@ public class TestStandardProcessScheduler { private StandardProcessScheduler scheduler = null; private ReportingTaskNode taskNode = null; private TestReportingTask reportingTask = null; - private StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class); + private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class); + private FlowController controller; + private ProcessGroup rootGroup; @Before public void setup() throws InitializationException { @@ -89,6 +93,10 @@ public class TestStandardProcessScheduler { final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null); taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory); + + controller = Mockito.mock(FlowController.class); + rootGroup = new MockProcessGroup(); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup); } /** @@ -115,20 +123,17 @@ public class TestStandardProcessScheduler { assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1); } - @Test(timeout = 6000) + @Test(timeout = 60000) public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException { final Processor proc = new ServiceReferencingProcessor(); - final ProcessGroup group = new MockProcessGroup(); - - final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class)); - serviceProvider.setRootProcessGroup(group); + final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class)); final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); - group.addControllerService(service); + rootGroup.addControllerService(service); final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(), new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); - group.addProcessor(procNode); + rootGroup.addProcessor(procNode); procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); @@ -184,16 +189,16 @@ public class TestStandardProcessScheduler { } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { } } private void refreshNiFiProperties() { try { - Field instanceField = NiFiProperties.class.getDeclaredField("instance"); + final Field instanceField = NiFiProperties.class.getDeclaredField("instance"); instanceField.setAccessible(true); instanceField.set(null, null); - } catch (Exception e) { + } catch (final Exception e) { throw new IllegalStateException(e); } } @@ -206,12 +211,12 @@ public class TestStandardProcessScheduler { @Test public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", false); assertFalse(serviceNode.isActive()); - SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); - ExecutorService executor = Executors.newCachedThreadPool(); + final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); + final ExecutorService executor = Executors.newCachedThreadPool(); final AtomicBoolean asyncFailed = new AtomicBoolean(); for (int i = 0; i < 1000; i++) { @@ -221,7 +226,7 @@ public class TestStandardProcessScheduler { try { scheduler.enableControllerService(serviceNode); assertTrue(serviceNode.isActive()); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); asyncFailed.set(true); } @@ -245,11 +250,11 @@ public class TestStandardProcessScheduler { @Test public void validateDisabledServiceCantBeDisabled() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", false); - SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); - ExecutorService executor = Executors.newCachedThreadPool(); + final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); + final ExecutorService executor = Executors.newCachedThreadPool(); final AtomicBoolean asyncFailed = new AtomicBoolean(); for (int i = 0; i < 1000; i++) { @@ -259,7 +264,7 @@ public class TestStandardProcessScheduler { try { scheduler.disableControllerService(serviceNode); assertFalse(serviceNode.isActive()); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); asyncFailed.set(true); } @@ -283,13 +288,13 @@ public class TestStandardProcessScheduler { @Test public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", false); - SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); + final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); scheduler.enableControllerService(serviceNode); assertTrue(serviceNode.isActive()); - ExecutorService executor = Executors.newCachedThreadPool(); + final ExecutorService executor = Executors.newCachedThreadPool(); final AtomicBoolean asyncFailed = new AtomicBoolean(); for (int i = 0; i < 1000; i++) { @@ -299,7 +304,7 @@ public class TestStandardProcessScheduler { try { scheduler.disableControllerService(serviceNode); assertFalse(serviceNode.isActive()); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); asyncFailed.set(true); } @@ -317,7 +322,7 @@ public class TestStandardProcessScheduler { @Test public void validateDisablingOfTheFailedService() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), "1", false); scheduler.enableControllerService(serviceNode); @@ -348,8 +353,8 @@ public class TestStandardProcessScheduler { @Test public void validateEnabledDisableMultiThread() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); - ExecutorService executor = Executors.newCachedThreadPool(); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); + final ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { final ControllerServiceNode serviceNode = provider .createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false); @@ -391,10 +396,10 @@ public class TestStandardProcessScheduler { @Test public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", false); - LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); + final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); ts.setLimit(Long.MAX_VALUE); scheduler.enableControllerService(serviceNode); Thread.sleep(100); @@ -416,10 +421,10 @@ public class TestStandardProcessScheduler { @Test public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", false); - LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); + final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); ts.setLimit(3000); scheduler.enableControllerService(serviceNode); Thread.sleep(2000); @@ -440,7 +445,7 @@ public class TestStandardProcessScheduler { public static class FailingService extends AbstractControllerService { @OnEnabled - public void enable(ConfigurationContext context) { + public void enable(final ConfigurationContext context) { throw new RuntimeException("intentional"); } } @@ -449,10 +454,10 @@ public class TestStandardProcessScheduler { private final Random random = new Random(); @OnEnabled - public void enable(ConfigurationContext context) { + public void enable(final ConfigurationContext context) { try { Thread.sleep(random.nextInt(20)); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } } @@ -464,12 +469,12 @@ public class TestStandardProcessScheduler { private final AtomicInteger disableCounter = new AtomicInteger(); @OnEnabled - public void enable(ConfigurationContext context) { + public void enable(final ConfigurationContext context) { this.enableCounter.incrementAndGet(); } @OnDisabled - public void disable(ConfigurationContext context) { + public void disable(final ConfigurationContext context) { this.disableCounter.incrementAndGet(); } @@ -489,13 +494,13 @@ public class TestStandardProcessScheduler { private volatile long limit; @OnEnabled - public void enable(ConfigurationContext context) throws Exception { + public void enable(final ConfigurationContext context) throws Exception { this.enableCounter.incrementAndGet(); Thread.sleep(limit); } @OnDisabled - public void disable(ConfigurationContext context) { + public void disable(final ConfigurationContext context) { this.disableCounter.incrementAndGet(); } @@ -507,7 +512,7 @@ public class TestStandardProcessScheduler { return this.disableCounter.get(); } - public void setLimit(long limit) { + public void setLimit(final long limit) { this.limit = limit; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java index b9c0f7f07e..1493f3863e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java @@ -46,7 +46,7 @@ public class StandardControllerServiceProviderTest { public void setup() throws Exception { String id = "id"; String clazz = "org.apache.nifi.controller.service.util.TestControllerService"; - ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, new StateManagerProvider() { + ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, null, new StateManagerProvider() { @Override public StateManager getStateManager(final String componentId) { return Mockito.mock(StateManager.class); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 5c83b97ee3..91a2e7acfe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -49,7 +50,7 @@ import org.mockito.Mockito; public class TestStandardControllerServiceProvider { private static StateManagerProvider stateManagerProvider = new StateManagerProvider() { @Override - public StateManager getStateManager(String componentId) { + public StateManager getStateManager(final String componentId) { return Mockito.mock(StateManager.class); } @@ -66,7 +67,7 @@ public class TestStandardControllerServiceProvider { } @Override - public void onComponentRemoved(String componentId) { + public void onComponentRemoved(final String componentId) { } }; @@ -81,20 +82,26 @@ public class TestStandardControllerServiceProvider { @Test public void testDisableControllerService() { + final ProcessGroup procGroup = new MockProcessGroup(); + final FlowController controller = Mockito.mock(FlowController.class); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false); provider.enableControllerService(serviceNode); provider.disableControllerService(serviceNode); } - @Test(timeout = 1000000) + @Test(timeout = 10000) public void testEnableDisableWithReference() { - final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); final ProcessGroup group = new MockProcessGroup(); - provider.setRootProcessGroup(group); + final FlowController controller = Mockito.mock(FlowController.class); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group); + + final ProcessScheduler scheduler = createScheduler(); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false); @@ -148,10 +155,12 @@ public class TestStandardControllerServiceProvider { } } - public void testEnableReferencingServicesGraph(ProcessScheduler scheduler) { - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); + public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) { final ProcessGroup procGroup = new MockProcessGroup(); - provider.setRootProcessGroup(procGroup); + final FlowController controller = Mockito.mock(FlowController.class); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); // build a graph of controller services with dependencies as such: // @@ -199,7 +208,11 @@ public class TestStandardControllerServiceProvider { @Test public void testOrderingOfServices() { - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); + final ProcessGroup procGroup = new MockProcessGroup(); + final FlowController controller = Mockito.mock(FlowController.class); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, null, null, stateManagerProvider); final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); @@ -354,8 +367,12 @@ public class TestStandardControllerServiceProvider { @Test public void testEnableReferencingComponents() { + final ProcessGroup procGroup = new MockProcessGroup(); + final FlowController controller = Mockito.mock(FlowController.class); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, null, null, stateManagerProvider); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false); final ProcessorNode procNode = createProcessor(scheduler, provider); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index c10679fd5e..2e60c314b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -17,6 +17,13 @@ package org.apache.nifi.controller.service.mock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.connectable.Connectable; @@ -34,14 +41,9 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - public class MockProcessGroup implements ProcessGroup { - private Map serviceMap = new HashMap<>(); + private final Map serviceMap = new HashMap<>(); + private final Map processorMap = new HashMap<>(); @Override public Authorizable getParentAuthorizable() { @@ -59,7 +61,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void setParent(ProcessGroup group) { + public void setParent(final ProcessGroup group) { } @@ -74,12 +76,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void setName(String name) { + public void setName(final String name) { } @Override - public void setPosition(Position position) { + public void setPosition(final Position position) { } @@ -94,7 +96,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void setComments(String comments) { + public void setComments(final String comments) { } @@ -114,67 +116,67 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void enableProcessor(ProcessorNode processor) { + public void enableProcessor(final ProcessorNode processor) { } @Override - public void enableInputPort(Port port) { + public void enableInputPort(final Port port) { } @Override - public void enableOutputPort(Port port) { + public void enableOutputPort(final Port port) { } @Override - public void startProcessor(ProcessorNode processor) { + public void startProcessor(final ProcessorNode processor) { } @Override - public void startInputPort(Port port) { + public void startInputPort(final Port port) { } @Override - public void startOutputPort(Port port) { + public void startOutputPort(final Port port) { } @Override - public void startFunnel(Funnel funnel) { + public void startFunnel(final Funnel funnel) { } @Override - public void stopProcessor(ProcessorNode processor) { + public void stopProcessor(final ProcessorNode processor) { } @Override - public void stopInputPort(Port port) { + public void stopInputPort(final Port port) { } @Override - public void stopOutputPort(Port port) { + public void stopOutputPort(final Port port) { } @Override - public void disableProcessor(ProcessorNode processor) { + public void disableProcessor(final ProcessorNode processor) { } @Override - public void disableInputPort(Port port) { + public void disableInputPort(final Port port) { } @Override - public void disableOutputPort(Port port) { + public void disableOutputPort(final Port port) { } @@ -189,12 +191,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void addInputPort(Port port) { + public void addInputPort(final Port port) { } @Override - public void removeInputPort(Port port) { + public void removeInputPort(final Port port) { } @@ -204,22 +206,22 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Port getInputPort(String id) { + public Port getInputPort(final String id) { return null; } @Override - public void addOutputPort(Port port) { + public void addOutputPort(final Port port) { } @Override - public void removeOutputPort(Port port) { + public void removeOutputPort(final Port port) { } @Override - public Port getOutputPort(String id) { + public Port getOutputPort(final String id) { return null; } @@ -229,12 +231,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void addProcessGroup(ProcessGroup group) { + public void addProcessGroup(final ProcessGroup group) { } @Override - public ProcessGroup getProcessGroup(String id) { + public ProcessGroup getProcessGroup(final String id) { return null; } @@ -244,28 +246,29 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void removeProcessGroup(ProcessGroup group) { + public void removeProcessGroup(final ProcessGroup group) { } @Override - public void addProcessor(ProcessorNode processor) { + public void addProcessor(final ProcessorNode processor) { processor.setProcessGroup(this); + processorMap.put(processor.getIdentifier(), processor); } @Override - public void removeProcessor(ProcessorNode processor) { - + public void removeProcessor(final ProcessorNode processor) { + processorMap.remove(processor.getIdentifier()); } @Override public Set getProcessors() { - return null; + return new HashSet<>(processorMap.values()); } @Override - public ProcessorNode getProcessor(String id) { - return null; + public ProcessorNode getProcessor(final String id) { + return processorMap.get(id); } @Override @@ -274,27 +277,27 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Connectable getConnectable(String id) { + public Connectable getConnectable(final String id) { return null; } @Override - public void addConnection(Connection connection) { + public void addConnection(final Connection connection) { } @Override - public void removeConnection(Connection connection) { + public void removeConnection(final Connection connection) { } @Override - public void inheritConnection(Connection connection) { + public void inheritConnection(final Connection connection) { } @Override - public Connection getConnection(String id) { + public Connection getConnection(final String id) { return null; } @@ -304,7 +307,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Connection findConnection(String id) { + public Connection findConnection(final String id) { return null; } @@ -314,12 +317,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Funnel findFunnel(String id) { + public Funnel findFunnel(final String id) { return null; } @Override - public ControllerServiceNode findControllerService(String id) { + public ControllerServiceNode findControllerService(final String id) { return serviceMap.get(id); } @@ -329,17 +332,17 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void addRemoteProcessGroup(RemoteProcessGroup remoteGroup) { + public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { } @Override - public void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup) { + public void removeRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { } @Override - public RemoteProcessGroup getRemoteProcessGroup(String id) { + public RemoteProcessGroup getRemoteProcessGroup(final String id) { return null; } @@ -349,12 +352,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void addLabel(Label label) { + public void addLabel(final Label label) { } @Override - public void removeLabel(Label label) { + public void removeLabel(final Label label) { } @@ -364,12 +367,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Label getLabel(String id) { + public Label getLabel(final String id) { return null; } @Override - public ProcessGroup findProcessGroup(String id) { + public ProcessGroup findProcessGroup(final String id) { return null; } @@ -379,7 +382,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public RemoteProcessGroup findRemoteProcessGroup(String id) { + public RemoteProcessGroup findRemoteProcessGroup(final String id) { return null; } @@ -389,17 +392,17 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public ProcessorNode findProcessor(String id) { - return null; + public ProcessorNode findProcessor(final String id) { + return processorMap.get(id); } @Override public List findAllProcessors() { - return null; + return new ArrayList<>(processorMap.values()); } @Override - public Label findLabel(String id) { + public Label findLabel(final String id) { return null; } @@ -409,7 +412,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Port findInputPort(String id) { + public Port findInputPort(final String id) { return null; } @@ -419,12 +422,12 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Port getInputPortByName(String name) { + public Port getInputPortByName(final String name) { return null; } @Override - public Port findOutputPort(String id) { + public Port findOutputPort(final String id) { return null; } @@ -434,17 +437,17 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Port getOutputPortByName(String name) { + public Port getOutputPortByName(final String name) { return null; } @Override - public void addFunnel(Funnel funnel) { + public void addFunnel(final Funnel funnel) { } @Override - public void addFunnel(Funnel funnel, boolean autoStart) { + public void addFunnel(final Funnel funnel, final boolean autoStart) { } @@ -454,33 +457,33 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Funnel getFunnel(String id) { + public Funnel getFunnel(final String id) { return null; } @Override - public void removeFunnel(Funnel funnel) { + public void removeFunnel(final Funnel funnel) { } @Override - public void addControllerService(ControllerServiceNode service) { + public void addControllerService(final ControllerServiceNode service) { serviceMap.put(service.getIdentifier(), service); service.setProcessGroup(this); } @Override - public ControllerServiceNode getControllerService(String id) { + public ControllerServiceNode getControllerService(final String id) { return serviceMap.get(id); } @Override - public Set getControllerServices(boolean recursive) { + public Set getControllerServices(final boolean recursive) { return new HashSet<>(serviceMap.values()); } @Override - public void removeControllerService(ControllerServiceNode service) { + public void removeControllerService(final ControllerServiceNode service) { serviceMap.remove(service.getIdentifier()); } @@ -490,17 +493,17 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void remove(Snippet snippet) { + public void remove(final Snippet snippet) { } @Override - public Connectable findConnectable(String identifier) { + public Connectable findConnectable(final String identifier) { return null; } @Override - public void move(Snippet snippet, ProcessGroup destination) { + public void move(final Snippet snippet, final ProcessGroup destination) { } @@ -510,7 +513,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void verifyCanDelete(boolean ignorePortConnections) { + public void verifyCanDelete(final boolean ignorePortConnections) { } @@ -525,31 +528,31 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void verifyCanDelete(Snippet snippet) { + public void verifyCanDelete(final Snippet snippet) { } @Override - public void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup) { + public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) { } @Override - public void addTemplate(Template template) { + public void addTemplate(final Template template) { throw new UnsupportedOperationException(); } @Override - public void removeTemplate(Template template) { + public void removeTemplate(final Template template) { } @Override - public Template getTemplate(String id) { + public Template getTemplate(final String id) { return null; } @Override - public Template findTemplate(String id) { + public Template findTemplate(final String id) { return null; } @@ -564,10 +567,10 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void verifyCanStart(Connectable connectable) { + public void verifyCanStart(final Connectable connectable) { } @Override - public void verifyCanStop(Connectable connectable) { + public void verifyCanStop(final Connectable connectable) { } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java index 75776cc661..4488f688b7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java @@ -148,27 +148,27 @@ public class TestStandardPropertyValue { } @Override - public Set getControllerServiceIdentifiers(final Class serviceType, String groupId) { + public Set getControllerServiceIdentifiers(final Class serviceType) { return null; } @Override - public boolean isControllerServiceEnabled(String serviceIdentifier) { + public boolean isControllerServiceEnabled(final String serviceIdentifier) { return true; } @Override - public boolean isControllerServiceEnabled(ControllerService service) { + public boolean isControllerServiceEnabled(final ControllerService service) { return true; } @Override - public String getControllerServiceName(String serviceIdentifier) { + public String getControllerServiceName(final String serviceIdentifier) { return null; } @Override - public boolean isControllerServiceEnabling(String serviceIdentifier) { + public boolean isControllerServiceEnabling(final String serviceIdentifier) { return false; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 25ffed1f60..3d5f650b87 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,28 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -173,26 +194,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -294,7 +296,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Authorizable getControllerServiceReferencingComponent(String controllerSeriveId, String id) { + public Authorizable getControllerServiceReferencingComponent(final String controllerSeriveId, final String id) { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerSeriveId); final ControllerServiceReference referencingComponents = controllerService.getReferences(); @@ -329,7 +331,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Authorizable getConnectable(String id) { + public Authorizable getConnectable(final String id) { final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); return group.findConnectable(id); } @@ -340,49 +342,49 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- @Override - public void authorizeAccess(AuthorizeAccess authorizeAccess) { + public void authorizeAccess(final AuthorizeAccess authorizeAccess) { authorizeAccess.authorize(authorizableLookup); } @Override - public void claimRevision(Revision revision, NiFiUser user) { + public void claimRevision(final Revision revision, final NiFiUser user) { revisionManager.requestClaim(revision, user); } @Override - public void claimRevisions(Set revisions, NiFiUser user) { + public void claimRevisions(final Set revisions, final NiFiUser user) { revisionManager.requestClaim(revisions, user); } @Override - public void cancelRevision(Revision revision) { + public void cancelRevision(final Revision revision) { revisionManager.cancelClaim(revision); } @Override - public void cancelRevisions(Set revisions) { + public void cancelRevisions(final Set revisions) { revisionManager.cancelClaims(revisions); } @Override - public void releaseRevisionClaim(Revision revision, NiFiUser user) throws InvalidRevisionException { + public void releaseRevisionClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException { revisionManager.releaseClaim(new StandardRevisionClaim(revision), user); } @Override - public void releaseRevisionClaims(Set revisions, NiFiUser user) throws InvalidRevisionException { + public void releaseRevisionClaims(final Set revisions, final NiFiUser user) throws InvalidRevisionException { revisionManager.releaseClaim(new StandardRevisionClaim(revisions), user); } @Override - public Set getRevisionsFromGroup(String groupId, Function> getComponents) { + public Set getRevisionsFromGroup(final String groupId, final Function> getComponents) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); final Set componentIds = revisionManager.get(group.getIdentifier(), rev -> getComponents.apply(group)); return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); } @Override - public Set getRevisionsFromSnippet(String snippetId) { + public Set getRevisionsFromSnippet(final String snippetId) { final Snippet snippet = snippetDAO.getSnippet(snippetId); final Set componentIds = new HashSet<>(); componentIds.addAll(snippet.getProcessors().keySet()); @@ -401,17 +403,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- @Override - public void verifyListQueue(String connectionId) { + public void verifyListQueue(final String connectionId) { connectionDAO.verifyList(connectionId); } @Override - public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) { + public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) { connectionDAO.verifyCreate(groupId, connectionDTO); } @Override - public void verifyUpdateConnection(ConnectionDTO connectionDTO) { + public void verifyUpdateConnection(final ConnectionDTO connectionDTO) { try { // if connection does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -427,17 +429,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteConnection(String connectionId) { + public void verifyDeleteConnection(final String connectionId) { connectionDAO.verifyDelete(connectionId); } @Override - public void verifyDeleteFunnel(String funnelId) { + public void verifyDeleteFunnel(final String funnelId) { funnelDAO.verifyDelete(funnelId); } @Override - public void verifyUpdateInputPort(PortDTO inputPortDTO) { + public void verifyUpdateInputPort(final PortDTO inputPortDTO) { try { // if connection does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -451,12 +453,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteInputPort(String inputPortId) { + public void verifyDeleteInputPort(final String inputPortId) { inputPortDAO.verifyDelete(inputPortId); } @Override - public void verifyUpdateOutputPort(PortDTO outputPortDTO) { + public void verifyUpdateOutputPort(final PortDTO outputPortDTO) { try { // if connection does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -470,12 +472,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteOutputPort(String outputPortId) { + public void verifyDeleteOutputPort(final String outputPortId) { outputPortDAO.verifyDelete(outputPortId); } @Override - public void verifyUpdateProcessor(ProcessorDTO processorDTO) { + public void verifyUpdateProcessor(final ProcessorDTO processorDTO) { try { // if group does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -489,12 +491,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteProcessor(String processorId) { + public void verifyDeleteProcessor(final String processorId) { processorDAO.verifyDelete(processorId); } @Override - public void verifyScheduleComponents(String groupId, ScheduledState state, Set componentIds) { + public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set componentIds) { try { processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); } catch (final Exception e) { @@ -504,12 +506,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteProcessGroup(String groupId) { + public void verifyDeleteProcessGroup(final String groupId) { processGroupDAO.verifyDelete(groupId); } @Override - public void verifyUpdateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) { + public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) { try { // if remote group does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -523,7 +525,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyUpdateRemoteProcessGroupInputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { + public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { try { remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); } catch (final Exception e) { @@ -533,7 +535,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyUpdateRemoteProcessGroupOutputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { + public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { try { remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); } catch (final Exception e) { @@ -543,12 +545,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteRemoteProcessGroup(String remoteProcessGroupId) { + public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) { remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); } @Override - public void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO) { + public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) { try { // if service does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -562,7 +564,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyUpdateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) { + public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { try { controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); } catch (final Exception e) { @@ -572,12 +574,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteControllerService(String controllerServiceId) { + public void verifyDeleteControllerService(final String controllerServiceId) { controllerServiceDAO.verifyDelete(controllerServiceId); } @Override - public void verifyUpdateReportingTask(ReportingTaskDTO reportingTaskDTO) { + public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) { try { // if tasks does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -591,7 +593,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteReportingTask(String reportingTaskId) { + public void verifyDeleteReportingTask(final String reportingTaskId) { reportingTaskDAO.verifyDelete(reportingTaskId); } @@ -713,7 +715,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override - public void verifyUpdateSnippet(SnippetDTO snippetDto, final Set affectedComponentIds) { + public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set affectedComponentIds) { try { // if snippet does not exist, then the update request is likely creating it // so we don't verify since it will fail @@ -752,7 +754,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return new StandardRevisionUpdate<>(dto, null, updatedRevisions); } }); - } catch (ExpiredRevisionClaimException e) { + } catch (final ExpiredRevisionClaimException e) { throw new InvalidRevisionException("Failed to update Snippet", e); } @@ -926,7 +928,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public NodeDTO updateNode(NodeDTO nodeDTO) { + public NodeDTO updateNode(final NodeDTO nodeDTO) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); if (user == null) { throw new WebApplicationException(new Throwable("Unable to access details for current user.")); @@ -950,7 +952,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public CounterDTO updateCounter(String counterId) { + public CounterDTO updateCounter(final String counterId) { return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId)); } @@ -1019,7 +1021,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public DropRequestDTO deleteFlowFileDropRequest(String connectionId, String dropRequestId) { + public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.WRITE); @@ -1027,7 +1029,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ListingRequestDTO deleteFlowFileListingRequest(String connectionId, String listingRequestId) { + public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.WRITE); @@ -1108,7 +1110,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteSnippet(String snippetId, Set affectedComponentIds) { + public void verifyDeleteSnippet(final String snippetId, final Set affectedComponentIds) { try { snippetDAO.verifyDeleteSnippetComponents(snippetId); } catch (final Exception e) { @@ -1192,7 +1194,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void deleteTemplate(String id) { + public void deleteTemplate(final String id) { // create the template templateDAO.deleteTemplate(id); } @@ -1211,14 +1213,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public DropRequestDTO createFlowFileDropRequest(String connectionId, String dropRequestId) { + public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.WRITE); return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId)); } @Override - public ListingRequestDTO createFlowFileListingRequest(String connectionId, String listingRequestId) { + public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.WRITE); @@ -1460,12 +1462,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public TemplateDTO createTemplate(String name, String description, String snippetId, String groupId, Optional idGenerationSeed) { + public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional idGenerationSeed) { // get the specified snippet - Snippet snippet = snippetDAO.getSnippet(snippetId); + final Snippet snippet = snippetDAO.getSnippet(snippetId); // create the template - TemplateDTO templateDTO = new TemplateDTO(); + final TemplateDTO templateDTO = new TemplateDTO(); templateDTO.setName(name); templateDTO.setDescription(description); templateDTO.setTimestamp(new Date()); @@ -1477,7 +1479,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { templateDTO.setId(uuid); // create the template - Template template = templateDAO.createTemplate(templateDTO, groupId); + final Template template = templateDAO.createTemplate(templateDTO, groupId); // drop the snippet snippetDAO.dropSnippet(snippetId); @@ -1486,7 +1488,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, Optional idGenerationSeed) { + public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional idGenerationSeed) { // ensure id is set final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); templateDTO.setId(uuid); @@ -1577,7 +1579,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public ProcessGroupEntity createArchive() { try { controllerFacade.createArchive(); - } catch (IOException e) { + } catch (final IOException e) { logger.error("Failed to create an archive", e); } @@ -1624,27 +1626,39 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ControllerServiceEntity createControllerService(final String groupId, final ControllerServiceDTO controllerServiceDTO) { - // TODO - update once Controller Services can be scoped by Controller - final String normalizedGroupId = groupId == null ? controllerFacade.getRootGroupId() : groupId; - controllerServiceDTO.setParentGroupId(normalizedGroupId); + controllerServiceDTO.setParentGroupId(groupId); - final RevisionUpdate snapshot = createComponent( - controllerServiceDTO, - () -> { - // create the controller service + final String modifier = NiFiUserUtils.getNiFiUserName(); + + // ensure id is set + if (StringUtils.isBlank(controllerServiceDTO.getId())) { + controllerServiceDTO.setId(UUID.randomUUID().toString()); + } + + final ControllerServiceDTO updatedService; + if (groupId == null) { + // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock + // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped). + final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); + updatedService = dtoFactory.createControllerServiceDto(controllerService); + controllerFacade.save(); + } else { + updatedService = revisionManager.get(groupId, new ReadOnlyRevisionCallback() { + @Override + public ControllerServiceDTO withRevision(final Revision groupRevision) { final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); + controllerFacade.save(); + return dtoFactory.createControllerServiceDto(controllerService); + } + }); + } - // TODO - this logic should be part of the controllerServiceDAO - final ProcessGroup group = processGroupDAO.getProcessGroup(normalizedGroupId); - group.addControllerService(controllerService); - return controllerService; - }, - controllerService -> dtoFactory.createControllerServiceDto(controllerService)); + final FlowModification lastMod = new FlowModification(new Revision(0L, null, controllerServiceDTO.getId()), modifier); final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(controllerService); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); - return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, bulletins); + return entityFactory.createControllerServiceEntity(updatedService, dtoFactory.createRevisionDTO(lastMod), accessPolicy, bulletins); } @Override @@ -1888,19 +1902,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void deleteActions(Date endDate) { + public void deleteActions(final Date endDate) { // get the user from the request - NiFiUser user = NiFiUserUtils.getNiFiUser(); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); if (user == null) { throw new WebApplicationException(new Throwable("Unable to access details for current user.")); } // create the purge details - FlowChangePurgeDetails details = new FlowChangePurgeDetails(); + final FlowChangePurgeDetails details = new FlowChangePurgeDetails(); details.setEndDate(endDate); // create a purge action to record that records are being removed - FlowChangeAction purgeAction = new FlowChangeAction(); + final FlowChangeAction purgeAction = new FlowChangeAction(); purgeAction.setUserIdentity(user.getIdentity()); purgeAction.setUserName(user.getUserName()); purgeAction.setOperation(Operation.Purge); @@ -1915,27 +1929,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProvenanceDTO submitProvenance(ProvenanceDTO query) { + public ProvenanceDTO submitProvenance(final ProvenanceDTO query) { return controllerFacade.submitProvenance(query); } @Override - public void deleteProvenance(String queryId) { + public void deleteProvenance(final String queryId) { controllerFacade.deleteProvenanceQuery(queryId); } @Override - public LineageDTO submitLineage(LineageDTO lineage) { + public LineageDTO submitLineage(final LineageDTO lineage) { return controllerFacade.submitLineage(lineage); } @Override - public void deleteLineage(String lineageId) { + public void deleteLineage(final String lineageId) { controllerFacade.deleteLineage(lineageId); } @Override - public ProvenanceEventDTO submitReplay(Long eventId) { + public ProvenanceEventDTO submitReplay(final Long eventId) { return controllerFacade.submitReplay(eventId); } @@ -1944,27 +1958,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- @Override - public SearchResultsDTO searchController(String query) { + public SearchResultsDTO searchController(final String query) { return controllerFacade.search(query); } @Override - public DownloadableContent getContent(String connectionId, String flowFileUuid, String uri) { + public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) { return connectionDAO.getContent(connectionId, flowFileUuid, uri); } @Override - public DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection) { + public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) { return controllerFacade.getContent(eventId, uri, contentDirection); } @Override - public ProvenanceDTO getProvenance(String queryId) { + public ProvenanceDTO getProvenance(final String queryId) { return controllerFacade.getProvenanceQuery(queryId); } @Override - public LineageDTO getLineage(String lineageId) { + public LineageDTO getLineage(final String lineageId) { return controllerFacade.getLineage(lineageId); } @@ -1979,7 +1993,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) { + public ProcessGroupStatusDTO getProcessGroupStatus(final String groupId) { return dtoFactory.createProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(groupId)); } @@ -1989,10 +2003,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ComponentStateDTO getProcessorState(String processorId) { + public ComponentStateDTO getProcessorState(final String processorId) { return revisionManager.get(processorId, new ReadOnlyRevisionCallback() { @Override - public ComponentStateDTO withRevision(Revision revision) { + public ComponentStateDTO withRevision(final Revision revision) { final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); @@ -2004,10 +2018,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ComponentStateDTO getControllerServiceState(String controllerServiceId) { + public ComponentStateDTO getControllerServiceState(final String controllerServiceId) { return revisionManager.get(controllerServiceId, new ReadOnlyRevisionCallback() { @Override - public ComponentStateDTO withRevision(Revision revision) { + public ComponentStateDTO withRevision(final Revision revision) { final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); @@ -2019,10 +2033,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ComponentStateDTO getReportingTaskState(String reportingTaskId) { + public ComponentStateDTO getReportingTaskState(final String reportingTaskId) { return revisionManager.get(reportingTaskId, new ReadOnlyRevisionCallback() { @Override - public ComponentStateDTO withRevision(Revision revision) { + public ComponentStateDTO withRevision(final Revision revision) { final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); @@ -2035,9 +2049,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public CountersDTO getCounters() { - List counters = controllerFacade.getCounters(); - Set counterDTOs = new LinkedHashSet<>(counters.size()); - for (Counter counter : counters) { + final List counters = controllerFacade.getCounters(); + final Set counterDTOs = new LinkedHashSet<>(counters.size()); + for (final Counter counter : counters) { counterDTOs.add(dtoFactory.createCounterDto(counter)); } @@ -2049,7 +2063,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getConnections(String groupId) { + public Set getConnections(final String groupId) { return revisionManager.get(groupId, rev -> { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2070,7 +2084,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConnectionEntity getConnection(String connectionId) { + public ConnectionEntity getConnection(final String connectionId) { return revisionManager.get(connectionId, rev -> { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.READ); @@ -2083,14 +2097,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public DropRequestDTO getFlowFileDropRequest(String connectionId, String dropRequestId) { + public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.WRITE); return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId)); } @Override - public ListingRequestDTO getFlowFileListingRequest(String connectionId, String listingRequestId) { + public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); connection.authorize(authorizer, RequestAction.WRITE); @@ -2108,22 +2122,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FlowFileDTO getFlowFile(String connectionId, String flowFileUuid) { + public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) { return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid)); } @Override - public ConnectionStatusDTO getConnectionStatus(String connectionId) { + public ConnectionStatusDTO getConnectionStatus(final String connectionId) { return revisionManager.get(connectionId, rev -> dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId))); } @Override - public StatusHistoryDTO getConnectionStatusHistory(String connectionId) { + public StatusHistoryDTO getConnectionStatusHistory(final String connectionId) { return revisionManager.get(connectionId, rev -> controllerFacade.getConnectionStatusHistory(connectionId)); } @Override - public Set getProcessors(String groupId) { + public Set getProcessors(final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2144,24 +2158,24 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public TemplateDTO exportTemplate(String id) { - Template template = templateDAO.getTemplate(id); - TemplateDTO templateDetails = template.getDetails(); + public TemplateDTO exportTemplate(final String id) { + final Template template = templateDAO.getTemplate(id); + final TemplateDTO templateDetails = template.getDetails(); - TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template); + final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template); templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet())); return templateDTO; } @Override - public TemplateDTO getTemplate(String id) { + public TemplateDTO getTemplate(final String id) { return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id)); } @Override public Set getTemplates() { - Set templateDtos = new LinkedHashSet<>(); - for (Template template : templateDAO.getTemplates()) { + final Set templateDtos = new LinkedHashSet<>(); + for (final Template template : templateDAO.getTemplates()) { templateDtos.add(dtoFactory.createTemplateDTO(template)); } return templateDtos; @@ -2188,7 +2202,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProcessorEntity getProcessor(String id) { + public ProcessorEntity getProcessor(final String id) { return revisionManager.get(id, rev -> { final ProcessorNode processor = processorDAO.getProcessor(id); processor.authorize(authorizer, RequestAction.READ); @@ -2201,7 +2215,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public PropertyDescriptorDTO getProcessorPropertyDescriptor(String id, String property) { + public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) { final ProcessorNode processor = processorDAO.getProcessor(id); PropertyDescriptor descriptor = processor.getPropertyDescriptor(property); @@ -2214,17 +2228,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProcessorStatusDTO getProcessorStatus(String id) { + public ProcessorStatusDTO getProcessorStatus(final String id) { return revisionManager.get(id, rev -> dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id))); } @Override - public StatusHistoryDTO getProcessorStatusHistory(String id) { + public StatusHistoryDTO getProcessorStatusHistory(final String id) { return controllerFacade.getProcessorStatusHistory(id); } @Override - public BulletinBoardDTO getBulletinBoard(BulletinQueryDTO query) { + public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) { // build the query final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder() .groupIdMatches(query.getGroupId()) @@ -2246,7 +2260,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } // create the bulletin board - BulletinBoardDTO bulletinBoard = new BulletinBoardDTO(); + final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO(); bulletinBoard.setBulletins(bulletins); bulletinBoard.setGenerated(new Date()); return bulletinBoard; @@ -2299,7 +2313,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Set inputPorts = controllerFacade.getInputPorts(); final Set inputPortIds = inputPorts.stream().map(port -> port.getIdentifier()).collect(Collectors.toSet()); revisionManager.get(inputPortIds, () -> { - for (RootGroupPort inputPort : inputPorts) { + for (final RootGroupPort inputPort : inputPorts) { if (isUserAuthorized(user, inputPort)) { final PortDTO dto = new PortDTO(); dto.setId(inputPort.getIdentifier()); @@ -2317,7 +2331,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Set outputPorts = controllerFacade.getOutputPorts(); final Set outputPortIds = outputPorts.stream().map(port -> port.getIdentifier()).collect(Collectors.toSet()); revisionManager.get(outputPortIds, () -> { - for (RootGroupPort outputPort : controllerFacade.getOutputPorts()) { + for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { if (isUserAuthorized(user, outputPort)) { final PortDTO dto = new PortDTO(); dto.setId(outputPort.getIdentifier()); @@ -2376,7 +2390,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getLabels(String groupId) { + public Set getLabels(final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2394,7 +2408,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public LabelEntity getLabel(String labelId) { + public LabelEntity getLabel(final String labelId) { return revisionManager.get(labelId, rev -> { final Label label = labelDAO.getLabel(labelId); label.authorize(authorizer, RequestAction.READ); @@ -2406,7 +2420,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getFunnels(String groupId) { + public Set getFunnels(final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2424,7 +2438,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FunnelEntity getFunnel(String funnelId) { + public FunnelEntity getFunnel(final String funnelId) { return revisionManager.get(funnelId, rev -> { final Funnel funnel = funnelDAO.getFunnel(funnelId); funnel.authorize(authorizer, RequestAction.READ); @@ -2436,7 +2450,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getInputPorts(String groupId) { + public Set getInputPorts(final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2456,7 +2470,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getOutputPorts(String groupId) { + public Set getOutputPorts(final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2477,7 +2491,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getProcessGroups(String parentGroupId) { + public Set getProcessGroups(final String parentGroupId) { final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(parentGroupId); parentGroup.authorize(authorizer, RequestAction.READ); @@ -2497,7 +2511,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getRemoteProcessGroups(String groupId) { + public Set getRemoteProcessGroups(final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2518,7 +2532,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public PortEntity getInputPort(String inputPortId) { + public PortEntity getInputPort(final String inputPortId) { return revisionManager.get(inputPortId, rev -> { final Port port = inputPortDAO.getPort(inputPortId); port.authorize(authorizer, RequestAction.READ); @@ -2532,12 +2546,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public PortStatusDTO getInputPortStatus(String inputPortId) { + public PortStatusDTO getInputPortStatus(final String inputPortId) { return revisionManager.get(inputPortId, rev -> dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId))); } @Override - public PortEntity getOutputPort(String outputPortId) { + public PortEntity getOutputPort(final String outputPortId) { return revisionManager.get(outputPortId, rev -> { final Port port = outputPortDAO.getPort(outputPortId); port.authorize(authorizer, RequestAction.READ); @@ -2551,12 +2565,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public PortStatusDTO getOutputPortStatus(String outputPortId) { + public PortStatusDTO getOutputPortStatus(final String outputPortId) { return revisionManager.get(outputPortId, rev -> dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(outputPortId))); } @Override - public RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId) { + public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) { return revisionManager.get(remoteProcessGroupId, rev -> { final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); rpg.authorize(authorizer, RequestAction.READ); @@ -2570,17 +2584,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(String id) { + public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(final String id) { return revisionManager.get(id, rev -> dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(id))); } @Override - public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String id) { + public StatusHistoryDTO getRemoteProcessGroupStatusHistory(final String id) { return controllerFacade.getRemoteProcessGroupStatusHistory(id); } @Override - public ProcessGroupFlowEntity getProcessGroupFlow(String groupId, boolean recurse) { + public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId, final boolean recurse) { return revisionManager.get(groupId, rev -> { // get all identifiers for every child component @@ -2624,7 +2638,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ProcessGroupEntity getProcessGroup(String groupId) { + public ProcessGroupEntity getProcessGroup(final String groupId) { return revisionManager.get(groupId, rev -> { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); processGroup.authorize(authorizer, RequestAction.READ); @@ -2638,21 +2652,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getControllerServices(String groupId) { - // TODO - move this logic into the ControllerServiceDAO - - final Set serviceNodes; - final Set serviceIds; - if (groupId == null) { - // TODO - update when controller services are scoped by the controller - final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); - serviceNodes = group.getControllerServices(true); - serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet()); - } else { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - serviceNodes = group.getControllerServices(true); - serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet()); - } + public Set getControllerServices(final String groupId) { + final Set serviceNodes = controllerServiceDAO.getControllerServices(groupId); + final Set serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet()); return revisionManager.get(serviceIds, () -> { return serviceNodes.stream() @@ -2673,7 +2675,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ControllerServiceEntity getControllerService(String controllerServiceId) { + public ControllerServiceEntity getControllerService(final String controllerServiceId) { return revisionManager.get(controllerServiceId, rev -> { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); controllerService.authorize(authorizer, RequestAction.READ); @@ -2693,7 +2695,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public PropertyDescriptorDTO getControllerServicePropertyDescriptor(String id, String property) { + public PropertyDescriptorDTO getControllerServicePropertyDescriptor(final String id, final String property) { return revisionManager.get(id, rev -> { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id); PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property); @@ -2708,7 +2710,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(String controllerServiceId) { + public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) { return revisionManager.get(controllerServiceId, rev -> { final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId); final ControllerServiceReference ref = service.getReferences(); @@ -2734,7 +2736,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ReportingTaskEntity getReportingTask(String reportingTaskId) { + public ReportingTaskEntity getReportingTask(final String reportingTaskId) { return revisionManager.get(reportingTaskId, rev -> { final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); reportingTask.authorize(authorizer, RequestAction.READ); @@ -2747,7 +2749,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(String id, String property) { + public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(final String id, final String property) { final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id); PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property); @@ -2760,14 +2762,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public StatusHistoryDTO getProcessGroupStatusHistory(String groupId) { + public StatusHistoryDTO getProcessGroupStatusHistory(final String groupId) { return controllerFacade.getProcessGroupStatusHistory(groupId); } @Override - public HistoryDTO getActions(HistoryQueryDTO historyQueryDto) { + public HistoryDTO getActions(final HistoryQueryDTO historyQueryDto) { // extract the query criteria - HistoryQuery historyQuery = new HistoryQuery(); + final HistoryQuery historyQuery = new HistoryQuery(); historyQuery.setStartDate(historyQueryDto.getStartDate()); historyQuery.setEndDate(historyQueryDto.getEndDate()); historyQuery.setSourceId(historyQueryDto.getSourceId()); @@ -2778,16 +2780,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { historyQuery.setSortOrder(historyQueryDto.getSortOrder()); // perform the query - History history = auditService.getActions(historyQuery); + final History history = auditService.getActions(historyQuery); // create the response return dtoFactory.createHistoryDto(history); } @Override - public ActionDTO getAction(Integer actionId) { + public ActionDTO getAction(final Integer actionId) { // get the action - Action action = auditService.getAction(actionId); + final Action action = auditService.getAction(actionId); // ensure the action was found if (action == null) { @@ -2799,7 +2801,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ComponentHistoryDTO getComponentHistory(String componentId) { + public ComponentHistoryDTO getComponentHistory(final String componentId) { final Map propertyHistoryDtos = new LinkedHashMap<>(); final Map> propertyHistory = auditService.getPreviousValues(componentId); @@ -2871,12 +2873,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public NodeDTO getNode(String nodeId) { + public NodeDTO getNode(final String nodeId) { final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId); return getNode(nodeIdentifier); } - private NodeDTO getNode(NodeIdentifier nodeId) { + private NodeDTO getNode(final NodeIdentifier nodeId) { final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId); final List events = clusterCoordinator.getNodeEvents(nodeId); final boolean primary = nodeId.equals(clusterCoordinator.getPrimaryNode()); @@ -2885,7 +2887,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void deleteNode(String nodeId) { + public void deleteNode(final String nodeId) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); if (user == null) { throw new WebApplicationException(new Throwable("Unable to access details for current user.")); @@ -2902,95 +2904,95 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } /* setters */ - public void setProperties(NiFiProperties properties) { + public void setProperties(final NiFiProperties properties) { this.properties = properties; } - public void setControllerFacade(ControllerFacade controllerFacade) { + public void setControllerFacade(final ControllerFacade controllerFacade) { this.controllerFacade = controllerFacade; } - public void setRemoteProcessGroupDAO(RemoteProcessGroupDAO remoteProcessGroupDAO) { + public void setRemoteProcessGroupDAO(final RemoteProcessGroupDAO remoteProcessGroupDAO) { this.remoteProcessGroupDAO = remoteProcessGroupDAO; } - public void setLabelDAO(LabelDAO labelDAO) { + public void setLabelDAO(final LabelDAO labelDAO) { this.labelDAO = labelDAO; } - public void setFunnelDAO(FunnelDAO funnelDAO) { + public void setFunnelDAO(final FunnelDAO funnelDAO) { this.funnelDAO = funnelDAO; } - public void setSnippetDAO(SnippetDAO snippetDAO) { + public void setSnippetDAO(final SnippetDAO snippetDAO) { this.snippetDAO = snippetDAO; } - public void setProcessorDAO(ProcessorDAO processorDAO) { + public void setProcessorDAO(final ProcessorDAO processorDAO) { this.processorDAO = processorDAO; } - public void setConnectionDAO(ConnectionDAO connectionDAO) { + public void setConnectionDAO(final ConnectionDAO connectionDAO) { this.connectionDAO = connectionDAO; } - public void setAuditService(AuditService auditService) { + public void setAuditService(final AuditService auditService) { this.auditService = auditService; } - public void setKeyService(KeyService keyService) { + public void setKeyService(final KeyService keyService) { this.keyService = keyService; } - public void setRevisionManager(RevisionManager revisionManager) { + public void setRevisionManager(final RevisionManager revisionManager) { this.revisionManager = revisionManager; } - public void setDtoFactory(DtoFactory dtoFactory) { + public void setDtoFactory(final DtoFactory dtoFactory) { this.dtoFactory = dtoFactory; } - public void setEntityFactory(EntityFactory entityFactory) { + public void setEntityFactory(final EntityFactory entityFactory) { this.entityFactory = entityFactory; } - public void setInputPortDAO(PortDAO inputPortDAO) { + public void setInputPortDAO(final PortDAO inputPortDAO) { this.inputPortDAO = inputPortDAO; } - public void setOutputPortDAO(PortDAO outputPortDAO) { + public void setOutputPortDAO(final PortDAO outputPortDAO) { this.outputPortDAO = outputPortDAO; } - public void setProcessGroupDAO(ProcessGroupDAO processGroupDAO) { + public void setProcessGroupDAO(final ProcessGroupDAO processGroupDAO) { this.processGroupDAO = processGroupDAO; } - public void setControllerServiceDAO(ControllerServiceDAO controllerServiceDAO) { + public void setControllerServiceDAO(final ControllerServiceDAO controllerServiceDAO) { this.controllerServiceDAO = controllerServiceDAO; } - public void setReportingTaskDAO(ReportingTaskDAO reportingTaskDAO) { + public void setReportingTaskDAO(final ReportingTaskDAO reportingTaskDAO) { this.reportingTaskDAO = reportingTaskDAO; } - public void setTemplateDAO(TemplateDAO templateDAO) { + public void setTemplateDAO(final TemplateDAO templateDAO) { this.templateDAO = templateDAO; } - public void setSnippetUtils(SnippetUtils snippetUtils) { + public void setSnippetUtils(final SnippetUtils snippetUtils) { this.snippetUtils = snippetUtils; } - public void setAuthorizer(Authorizer authorizer) { + public void setAuthorizer(final Authorizer authorizer) { this.authorizer = authorizer; } - public void setClusterCoordinator(ClusterCoordinator coordinator) { + public void setClusterCoordinator(final ClusterCoordinator coordinator) { this.clusterCoordinator = coordinator; } - public void setHeartbeatMonitor(HeartbeatMonitor heartbeatMonitor) { + public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) { this.heartbeatMonitor = heartbeatMonitor; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index d396b35031..6bcdf612bd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -16,22 +16,7 @@ */ package org.apache.nifi.web; -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URLEncoder; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; - -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; - +import com.sun.jersey.core.util.MultivaluedMapImpl; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -56,8 +41,8 @@ import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.reporting.ReportingTaskProvider; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; @@ -74,7 +59,20 @@ import org.slf4j.LoggerFactory; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; -import com.sun.jersey.core.util.MultivaluedMapImpl; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; /** * Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments. @@ -88,7 +86,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration private NiFiServiceFacade serviceFacade; private ClusterCoordinator clusterCoordinator; private RequestReplicator requestReplicator; - private ControllerServiceLookup controllerServiceLookup; + private ControllerServiceProvider controllerServiceProvider; private ReportingTaskProvider reportingTaskProvider; private AuditService auditService; private Authorizer authorizer; @@ -113,10 +111,10 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration } @Override - public ControllerService getControllerService(String serviceIdentifier) { + public ControllerService getControllerService(final String serviceIdentifier, final String componentId) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); authorizeFlowAccess(user); - return controllerServiceLookup.getControllerService(serviceIdentifier); + return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, componentId); } @Override @@ -193,7 +191,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration try { // record the operations auditService.addActions(actions); - } catch (Throwable t) { + } catch (final Throwable t) { logger.warn("Unable to record actions: " + t.getMessage()); if (logger.isDebugEnabled()) { logger.warn(StringUtils.EMPTY, t); @@ -329,21 +327,21 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // create the request URL URI requestUrl; try { - String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8"); + final String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8"); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); } catch (final URISyntaxException | UnsupportedEncodingException use) { throw new ClusterRequestException(use); } // set the request parameters - MultivaluedMap parameters = new MultivaluedMapImpl(); + final MultivaluedMap parameters = new MultivaluedMapImpl(); parameters.add(VERBOSE_PARAM, "true"); // replicate request NodeResponse nodeResponse; try { nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); } @@ -381,28 +379,28 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // create the request URL URI requestUrl; try { - String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8"); + final String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8"); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); } catch (final URISyntaxException | UnsupportedEncodingException use) { throw new ClusterRequestException(use); } // create the revision - RevisionDTO revisionDto = new RevisionDTO(); + final RevisionDTO revisionDto = new RevisionDTO(); revisionDto.setClientId(revision.getClientId()); revisionDto.setVersion(revision.getVersion()); // create the processor entity - ProcessorEntity processorEntity = new ProcessorEntity(); + final ProcessorEntity processorEntity = new ProcessorEntity(); processorEntity.setRevision(revisionDto); // create the processor dto - ProcessorDTO processorDto = new ProcessorDTO(); + final ProcessorDTO processorDto = new ProcessorDTO(); processorEntity.setComponent(processorDto); processorDto.setId(id); // create the processor configuration with the given annotation data - ProcessorConfigDTO configDto = new ProcessorConfigDTO(); + final ProcessorConfigDTO configDto = new ProcessorConfigDTO(); processorDto.setConfig(configDto); configDto.setAnnotationData(annotationData); @@ -414,7 +412,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration NodeResponse nodeResponse; try { nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, processorEntity, headers).awaitMergedResponse(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); } @@ -474,7 +472,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // if the lookup has the service that means we are either a node or // the ncm and the service is available there only - if (controllerServiceLookup.getControllerService(id) != null) { + if (controllerServiceProvider.getControllerService(id) != null) { controllerService = serviceFacade.getControllerService(id).getComponent(); } else { // if this is a standalone instance the service should have been found above... there should @@ -486,20 +484,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // create the request URL URI requestUrl; try { - String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8"); + final String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8"); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); } catch (final URISyntaxException | UnsupportedEncodingException use) { throw new ClusterRequestException(use); } // set the request parameters - MultivaluedMap parameters = new MultivaluedMapImpl(); + final MultivaluedMap parameters = new MultivaluedMapImpl(); // replicate request NodeResponse nodeResponse; try { nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); } @@ -531,7 +529,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration }); final ControllerServiceDTO controllerService; - if (controllerServiceLookup.getControllerService(id) != null) { + if (controllerServiceProvider.getControllerService(id) != null) { final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO(); controllerServiceDto.setId(id); controllerServiceDto.setAnnotationData(annotationData); @@ -562,23 +560,23 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // create the request URL URI requestUrl; try { - String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8"); + final String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8"); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); } catch (final URISyntaxException | UnsupportedEncodingException use) { throw new ClusterRequestException(use); } // create the revision - RevisionDTO revisionDto = new RevisionDTO(); + final RevisionDTO revisionDto = new RevisionDTO(); revisionDto.setClientId(revision.getClientId()); revisionDto.setVersion(revision.getVersion()); // create the controller service entity - ControllerServiceEntity controllerServiceEntity = new ControllerServiceEntity(); + final ControllerServiceEntity controllerServiceEntity = new ControllerServiceEntity(); controllerServiceEntity.setRevision(revisionDto); // create the controller service dto - ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO(); + final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO(); controllerServiceEntity.setComponent(controllerServiceDto); controllerServiceDto.setId(id); controllerServiceDto.setAnnotationData(annotationData); @@ -591,7 +589,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration NodeResponse nodeResponse; try { nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers).awaitMergedResponse(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); } @@ -652,20 +650,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // create the request URL URI requestUrl; try { - String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8"); + final String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8"); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); } catch (final URISyntaxException | UnsupportedEncodingException use) { throw new ClusterRequestException(use); } // set the request parameters - MultivaluedMap parameters = new MultivaluedMapImpl(); + final MultivaluedMap parameters = new MultivaluedMapImpl(); // replicate request NodeResponse nodeResponse; try { nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); } @@ -727,23 +725,23 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration // create the request URL URI requestUrl; try { - String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8"); + final String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8"); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); } catch (final URISyntaxException | UnsupportedEncodingException use) { throw new ClusterRequestException(use); } // create the revision - RevisionDTO revisionDto = new RevisionDTO(); + final RevisionDTO revisionDto = new RevisionDTO(); revisionDto.setClientId(revision.getClientId()); revisionDto.setVersion(revision.getVersion()); // create the reporting task entity - ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity(); + final ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity(); reportingTaskEntity.setRevision(revisionDto); // create the reporting task dto - ReportingTaskDTO reportingTaskDto = new ReportingTaskDTO(); + final ReportingTaskDTO reportingTaskDto = new ReportingTaskDTO(); reportingTaskEntity.setComponent(reportingTaskDto); reportingTaskDto.setId(id); reportingTaskDto.setAnnotationData(annotationData); @@ -756,7 +754,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration NodeResponse nodeResponse; try { nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers).awaitMergedResponse(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); } @@ -831,35 +829,35 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration } } - public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) { + public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { this.clusterCoordinator = clusterCoordinator; } - public void setRequestReplicator(RequestReplicator requestReplicator) { + public void setRequestReplicator(final RequestReplicator requestReplicator) { this.requestReplicator = requestReplicator; } - public void setProperties(NiFiProperties properties) { + public void setProperties(final NiFiProperties properties) { this.properties = properties; } - public void setServiceFacade(NiFiServiceFacade serviceFacade) { + public void setServiceFacade(final NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; } - public void setAuditService(AuditService auditService) { + public void setAuditService(final AuditService auditService) { this.auditService = auditService; } - public void setControllerServiceLookup(ControllerServiceLookup controllerServiceLookup) { - this.controllerServiceLookup = controllerServiceLookup; + public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) { + this.controllerServiceProvider = controllerServiceProvider; } - public void setReportingTaskProvider(ReportingTaskProvider reportingTaskProvider) { + public void setReportingTaskProvider(final ReportingTaskProvider reportingTaskProvider) { this.reportingTaskProvider = reportingTaskProvider; } - public void setAuthorizer(Authorizer authorizer) { + public void setAuthorizer(final Authorizer authorizer) { this.authorizer = authorizer; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6dd3ecc2d5..c99f98c1f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -169,7 +169,7 @@ public final class DtoFactory { @SuppressWarnings("rawtypes") private final static Comparator CLASS_NAME_COMPARATOR = new Comparator() { @Override - public int compare(Class class1, Class class2) { + public int compare(final Class class1, final Class class2) { return Collator.getInstance(Locale.US).compare(class1.getSimpleName(), class2.getSimpleName()); } }; @@ -297,8 +297,8 @@ public final class DtoFactory { historyDto.setLastRefreshed(history.getLastRefreshed()); if (history.getActions() != null) { - List actionDtos = new ArrayList<>(); - for (Action action : history.getActions()) { + final List actionDtos = new ArrayList<>(); + for (final Action action : history.getActions()) { actionDtos.add(createActionDto(action)); } historyDto.setActions(actionDtos); @@ -998,7 +998,7 @@ public final class DtoFactory { * @param originalSnippet snippet * @return dto */ - public FlowSnippetDTO copySnippetContents(FlowSnippetDTO originalSnippet) { + public FlowSnippetDTO copySnippetContents(final FlowSnippetDTO originalSnippet) { final FlowSnippetDTO copySnippet = new FlowSnippetDTO(); if (originalSnippet.getConnections() != null) { @@ -1113,7 +1113,7 @@ public final class DtoFactory { // sort a copy of the properties final Map sortedProperties = new TreeMap<>(new Comparator() { @Override - public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { + public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) { return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); } }); @@ -1124,7 +1124,7 @@ public final class DtoFactory { final Map orderedProperties = new LinkedHashMap<>(); final List descriptors = reportingTask.getPropertyDescriptors(); if (descriptors != null && !descriptors.isEmpty()) { - for (PropertyDescriptor descriptor : descriptors) { + for (final PropertyDescriptor descriptor : descriptors) { orderedProperties.put(descriptor, null); } } @@ -1137,7 +1137,7 @@ public final class DtoFactory { final PropertyDescriptor descriptor = entry.getKey(); // store the property descriptor - dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, "root")); + dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, null)); // determine the property value - don't include sensitive properties String propertyValue = entry.getValue(); @@ -1177,7 +1177,7 @@ public final class DtoFactory { // sort a copy of the properties final Map sortedProperties = new TreeMap<>(new Comparator() { @Override - public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { + public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) { return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); } }); @@ -1188,7 +1188,7 @@ public final class DtoFactory { final Map orderedProperties = new LinkedHashMap<>(); final List descriptors = controllerService.getPropertyDescriptors(); if (descriptors != null && !descriptors.isEmpty()) { - for (PropertyDescriptor descriptor : descriptors) { + for (final PropertyDescriptor descriptor : descriptors) { orderedProperties.put(descriptor, null); } } @@ -1201,7 +1201,8 @@ public final class DtoFactory { final PropertyDescriptor descriptor = entry.getKey(); // store the property descriptor - dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, controllerServiceNode.getProcessGroup().getIdentifier())); + final String groupId = controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier(); + dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, groupId)); // determine the property value - don't include sensitive properties String propertyValue = entry.getValue(); @@ -1254,7 +1255,7 @@ public final class DtoFactory { propertyDescriptors = node.getControllerServiceImplementation().getPropertyDescriptors(); validationErrors = node.getValidationErrors(); - processGroupId = node.getProcessGroup().getIdentifier(); + processGroupId = node.getProcessGroup() == null ? null : node.getProcessGroup().getIdentifier(); } else if (component instanceof ReportingTaskNode) { final ReportingTaskNode node = ((ReportingTaskNode) component); dto.setState(node.getScheduledState().name()); @@ -1264,20 +1265,20 @@ public final class DtoFactory { propertyDescriptors = node.getReportingTask().getPropertyDescriptors(); validationErrors = node.getValidationErrors(); - processGroupId = "root"; + processGroupId = null; } if (propertyDescriptors != null && !propertyDescriptors.isEmpty()) { final Map sortedProperties = new TreeMap<>(new Comparator() { @Override - public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { + public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) { return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); } }); sortedProperties.putAll(component.getProperties()); final Map orderedProperties = new LinkedHashMap<>(); - for (PropertyDescriptor descriptor : propertyDescriptors) { + for (final PropertyDescriptor descriptor : propertyDescriptors) { orderedProperties.put(descriptor, null); } orderedProperties.putAll(sortedProperties); @@ -1717,7 +1718,7 @@ public final class DtoFactory { dto.setComments(group.getComments()); dto.setName(group.getName()); - ProcessGroup parentGroup = group.getParent(); + final ProcessGroup parentGroup = group.getParent(); if (parentGroup != null) { dto.setParentGroupId(parentGroup.getIdentifier()); } @@ -1886,7 +1887,7 @@ public final class DtoFactory { // sort the relationships Collections.sort(relationships, new Comparator() { @Override - public int compare(RelationshipDTO r1, RelationshipDTO r2) { + public int compare(final RelationshipDTO r1, final RelationshipDTO r2) { return Collator.getInstance(Locale.US).compare(r1.getName(), r2.getName()); } }); @@ -1923,7 +1924,7 @@ public final class DtoFactory { // sort the bulletins Collections.sort(bulletins, new Comparator() { @Override - public int compare(BulletinDTO bulletin1, BulletinDTO bulletin2) { + public int compare(final BulletinDTO bulletin1, final BulletinDTO bulletin2) { if (bulletin1 == null && bulletin2 == null) { return 0; } else if (bulletin1 == null) { @@ -2260,7 +2261,7 @@ public final class DtoFactory { // sort a copy of the properties final Map sortedProperties = new TreeMap<>(new Comparator() { @Override - public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { + public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) { return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); } }); @@ -2271,7 +2272,7 @@ public final class DtoFactory { final Map orderedProperties = new LinkedHashMap<>(); final List descriptors = processor.getPropertyDescriptors(); if (descriptors != null && !descriptors.isEmpty()) { - for (PropertyDescriptor descriptor : descriptors) { + for (final PropertyDescriptor descriptor : descriptors) { orderedProperties.put(descriptor, null); } } @@ -2798,7 +2799,7 @@ public final class DtoFactory { * @param lastMod mod * @return dto */ - public RevisionDTO createRevisionDTO(FlowModification lastMod) { + public RevisionDTO createRevisionDTO(final FlowModification lastMod) { final Revision revision = lastMod.getRevision(); // create the dto @@ -2817,7 +2818,7 @@ public final class DtoFactory { return dto; } - public NodeDTO createNodeDTO(NodeIdentifier nodeId, NodeConnectionStatus status, NodeHeartbeat nodeHeartbeat, List events, boolean primary) { + public NodeDTO createNodeDTO(final NodeIdentifier nodeId, final NodeConnectionStatus status, final NodeHeartbeat nodeHeartbeat, final List events, final boolean primary) { final NodeDTO nodeDto = new NodeDTO(); // populate node dto @@ -2844,7 +2845,7 @@ public final class DtoFactory { final List nodeEvents = new ArrayList<>(events); Collections.sort(nodeEvents, new Comparator() { @Override - public int compare(NodeEvent event1, NodeEvent event2) { + public int compare(final NodeEvent event1, final NodeEvent event2) { return new Date(event2.getTimestamp()).compareTo(new Date(event1.getTimestamp())); } }); @@ -2868,15 +2869,15 @@ public final class DtoFactory { /* setters */ - public void setControllerServiceProvider(ControllerServiceProvider controllerServiceProvider) { + public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) { this.controllerServiceProvider = controllerServiceProvider; } - public void setAuthorizer(Authorizer authorizer) { + public void setAuthorizer(final Authorizer authorizer) { this.authorizer = authorizer; } - public void setEntityFactory(EntityFactory entityFactory) { + public void setEntityFactory(final EntityFactory entityFactory) { this.entityFactory = entityFactory; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java index eac7a5a21d..b79bd7648e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java @@ -51,11 +51,12 @@ public interface ControllerServiceDAO { ControllerServiceNode getControllerService(String controllerServiceId); /** - * Gets all of the controller services. + * Gets all of the controller services for the group with the given ID or all + * controller-level services if the group id is null * * @return The controller services */ - Set getControllerServices(); + Set getControllerServices(String groupId); /** * Updates the specified controller service. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index 435d5ce449..3db3973e15 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -16,31 +16,34 @@ */ package org.apache.nifi.web.dao.impl; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.dao.ComponentStateDAO; import org.apache.nifi.web.dao.ControllerServiceDAO; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO { private ControllerServiceProvider serviceProvider; private ComponentStateDAO componentStateDAO; + private FlowController flowController; private ControllerServiceNode locateControllerService(final String controllerServiceId) { // get the controller service @@ -71,6 +74,24 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro // perform the update configureControllerService(controllerService, controllerServiceDTO); + final String groupId = controllerServiceDTO.getParentGroupId(); + if (groupId == null) { + flowController.addRootControllerService(controllerService); + } else { + final ProcessGroup group; + if (groupId.equals(FlowController.ROOT_GROUP_ID_ALIAS)) { + group = flowController.getGroup(flowController.getRootGroupId()); + } else { + group = flowController.getGroup(flowController.getRootGroupId()).findProcessGroup(groupId); + } + + if (group == null) { + throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); + } + + group.addControllerService(controllerService); + } + return controllerService; } catch (final ControllerServiceInstantiationException csie) { throw new NiFiCoreException(csie.getMessage(), csie); @@ -88,8 +109,17 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } @Override - public Set getControllerServices() { - return serviceProvider.getAllControllerServices(); + public Set getControllerServices(final String groupId) { + if (groupId == null) { + return flowController.getRootControllerServices(); + } else { + final ProcessGroup procGroup = flowController.getGroup(flowController.getRootGroupId()).findProcessGroup(groupId); + if (procGroup == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId); + } + + return procGroup.getControllerServices(true); + } } @Override @@ -162,7 +192,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } @Override - public void verifyUpdateReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) { + public void verifyUpdateReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); if (controllerServiceState != null) { @@ -200,7 +230,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro controllerService.verifyCanDisable(); } } - } catch (IllegalArgumentException iae) { + } catch (final IllegalArgumentException iae) { throw new IllegalArgumentException("Controller Service state: Value must be one of [ENABLED, DISABLED]"); } } @@ -255,35 +285,39 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } @Override - public void deleteControllerService(String controllerServiceId) { + public void deleteControllerService(final String controllerServiceId) { final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); serviceProvider.removeControllerService(controllerService); } @Override - public StateMap getState(String controllerServiceId, Scope scope) { + public StateMap getState(final String controllerServiceId, final Scope scope) { final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); return componentStateDAO.getState(controllerService, scope); } @Override - public void verifyClearState(String controllerServiceId) { + public void verifyClearState(final String controllerServiceId) { final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); controllerService.verifyCanClearState(); } @Override - public void clearState(String controllerServiceId) { + public void clearState(final String controllerServiceId) { final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); componentStateDAO.clearState(controllerService); } /* setters */ - public void setServiceProvider(ControllerServiceProvider serviceProvider) { + public void setServiceProvider(final ControllerServiceProvider serviceProvider) { this.serviceProvider = serviceProvider; } - public void setComponentStateDAO(ComponentStateDAO componentStateDAO) { + public void setComponentStateDAO(final ComponentStateDAO componentStateDAO) { this.componentStateDAO = componentStateDAO; } + + public void setFlowController(final FlowController flowController) { + this.flowController = flowController; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 1ca34dc57b..39c386df99 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -91,6 +91,7 @@ + @@ -150,7 +151,7 @@ - +