NIFI-2033: Allow Controller Services to be scoped at Controller level instead of just group level. This closes #540

This commit is contained in:
Mark Payne 2016-06-17 09:08:44 -04:00 committed by Matt Gilman
parent 2c69c25323
commit c955ec1689
35 changed files with 826 additions and 554 deletions

View File

@ -138,7 +138,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
// if the property descriptor identifies a Controller Service, validate that the ControllerService exists, is of the correct type, and is valid // if the property descriptor identifies a Controller Service, validate that the ControllerService exists, is of the correct type, and is valid
if (controllerServiceDefinition != null) { if (controllerServiceDefinition != null) {
final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition, context.getProcessGroupIdentifier()); final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition);
if (validIdentifiers != null && validIdentifiers.contains(input)) { if (validIdentifiers != null && validIdentifiers.contains(input)) {
final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input); final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input);
if (!context.isValidationRequired(controllerService)) { if (!context.isValidationRequired(controllerService)) {
@ -213,7 +213,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
* for ENABLED state even though by the time this method returns the * for ENABLED state even though by the time this method returns the
* dependent service's state could be fully ENABLED. * dependent service's state could be fully ENABLED.
*/ */
private boolean isDependentServiceEnableable(ValidationContext context, String serviceId) { private boolean isDependentServiceEnableable(final ValidationContext context, final String serviceId) {
boolean enableable = context.getControllerServiceLookup().isControllerServiceEnabling(serviceId); boolean enableable = context.getControllerServiceLookup().isControllerServiceEnabling(serviceId);
if (!enableable) { if (!enableable) {
enableable = context.getControllerServiceLookup().isControllerServiceEnabled(serviceId); enableable = context.getControllerServiceLookup().isControllerServiceEnabled(serviceId);
@ -516,7 +516,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
return true; return true;
} }
PropertyDescriptor desc = (PropertyDescriptor) other; final PropertyDescriptor desc = (PropertyDescriptor) other;
return this.name.equals(desc.name); return this.name.equals(desc.name);
} }

View File

@ -56,13 +56,12 @@ public interface ControllerServiceLookup {
/** /**
* *
* @param serviceType type of service to get identifiers for * @param serviceType type of service to get identifiers for
* @param groupId the ID of the Process Group to look in for Controller Services
* *
* @return the set of all Controller Service Identifiers whose Controller * @return the set of all Controller Service Identifiers whose Controller
* Service is of the given type. * Service is of the given type.
* @throws IllegalArgumentException if the given class is not an interface * @throws IllegalArgumentException if the given class is not an interface
*/ */
Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId) throws IllegalArgumentException; Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) throws IllegalArgumentException;
/** /**
* @param serviceIdentifier identifier to look up * @param serviceIdentifier identifier to look up

View File

@ -28,12 +28,13 @@ public interface NiFiWebConfigurationContext {
/** /**
* @param serviceIdentifier of the controller service * @param serviceIdentifier of the controller service
* @param componentId the id of the component that is referencing the controller service
* @return the ControllerService for the specified identifier. If a * @return the ControllerService for the specified identifier. If a
* corresponding service cannot be found, null is returned. If this NiFi is * corresponding service cannot be found, null is returned. If this NiFi is
* clustered, the only services available will be those those availability * clustered, the only services available will be those those availability
* is NCM only * is NCM only
*/ */
ControllerService getControllerService(String serviceIdentifier); ControllerService getControllerService(String serviceIdentifier, String componentId);
/** /**
* Provides a mechanism for custom UIs to save actions to appear in NiFi * Provides a mechanism for custom UIs to save actions to appear in NiFi

View File

@ -38,7 +38,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
return addControllerService(service, service.getIdentifier()); return addControllerService(service, service.getIdentifier());
} }
public void removeControllerService(ControllerService service) { public void removeControllerService(final ControllerService service) {
final ControllerService canonical = getControllerService(service.getIdentifier()); final ControllerService canonical = getControllerService(service.getIdentifier());
if (canonical == null || canonical != service) { if (canonical == null || canonical != service) {
throw new IllegalArgumentException("Controller Service " + service + " is not known"); throw new IllegalArgumentException("Controller Service " + service + " is not known");
@ -82,7 +82,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
final Set<String> ids = new HashSet<>(); final Set<String> ids = new HashSet<>();
for (final Map.Entry<String, ControllerServiceConfiguration> entry : controllerServiceMap.entrySet()) { for (final Map.Entry<String, ControllerServiceConfiguration> entry : controllerServiceMap.entrySet()) {
if (serviceType.isAssignableFrom(entry.getValue().getService().getClass())) { if (serviceType.isAssignableFrom(entry.getValue().getService().getClass())) {
@ -93,7 +93,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
} }
@Override @Override
public String getControllerServiceName(String serviceIdentifier) { public String getControllerServiceName(final String serviceIdentifier) {
final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier); final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier);
return status == null ? null : serviceIdentifier; return status == null ? null : serviceIdentifier;
} }

View File

@ -47,8 +47,8 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return context.getControllerServiceIdentifiers(serviceType, groupId); return context.getControllerServiceIdentifiers(serviceType);
} }
@Override @Override
@ -62,22 +62,22 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
} }
@Override @Override
public String getControllerServiceName(String serviceIdentifier) { public String getControllerServiceName(final String serviceIdentifier) {
return context.getControllerServiceName(serviceIdentifier); return context.getControllerServiceName(serviceIdentifier);
} }
@Override @Override
public boolean isControllerServiceEnabled(String serviceIdentifier) { public boolean isControllerServiceEnabled(final String serviceIdentifier) {
return context.isControllerServiceEnabled(serviceIdentifier); return context.isControllerServiceEnabled(serviceIdentifier);
} }
@Override @Override
public boolean isControllerServiceEnabled(ControllerService service) { public boolean isControllerServiceEnabled(final ControllerService service) {
return context.isControllerServiceEnabled(service); return context.isControllerServiceEnabled(service);
} }
@Override @Override
public boolean isControllerServiceEnabling(String serviceIdentifier) { public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return context.isControllerServiceEnabling(serviceIdentifier); return context.isControllerServiceEnabling(serviceIdentifier);
} }
} }

View File

@ -86,8 +86,8 @@ public class MockValidationContext implements ValidationContext, ControllerServi
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return context.getControllerServiceIdentifiers(serviceType, groupId); return context.getControllerServiceIdentifiers(serviceType);
} }
@Override @Override
@ -117,7 +117,7 @@ public class MockValidationContext implements ValidationContext, ControllerServi
} }
@Override @Override
public boolean isControllerServiceEnabling(String serviceIdentifier) { public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return context.isControllerServiceEnabling(serviceIdentifier); return context.isControllerServiceEnabling(serviceIdentifier);
} }

View File

@ -32,34 +32,32 @@ import org.apache.nifi.controller.ControllerServiceLookup;
public class MockControllerServiceLookup implements ControllerServiceLookup { public class MockControllerServiceLookup implements ControllerServiceLookup {
@Override @Override
public ControllerService getControllerService(String serviceIdentifier) { public ControllerService getControllerService(final String serviceIdentifier) {
return null; return null;
} }
@Override @Override
public boolean isControllerServiceEnabled(String serviceIdentifier) { public boolean isControllerServiceEnabled(final String serviceIdentifier) {
return false; return false;
} }
@Override @Override
public boolean isControllerServiceEnabled(ControllerService service) { public boolean isControllerServiceEnabled(final ControllerService service) {
return false; return false;
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId) public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
throws IllegalArgumentException {
return Collections.emptySet(); return Collections.emptySet();
} }
@Override @Override
public boolean isControllerServiceEnabling(String serviceIdentifier) { public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return false; return false;
} }
@Override @Override
public String getControllerServiceName(String serviceIdentifier) { public String getControllerServiceName(final String serviceIdentifier) {
return serviceIdentifier; return serviceIdentifier;
} }
} }

View File

@ -262,7 +262,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override @Override
public boolean isValid() { public boolean isValid() {
final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext( final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(
getProperties(), getAnnotationData(), getProcessGroupIdentifier())); getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()));
for (final ValidationResult result : validationResults) { for (final ValidationResult result : validationResults) {
if (!result.isValid()) { if (!result.isValid()) {
@ -283,7 +283,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
lock.lock(); lock.lock();
try { try {
final ValidationContext validationContext = validationContextFactory.newValidationContext( final ValidationContext validationContext = validationContextFactory.newValidationContext(
serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier()); serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults; final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {

View File

@ -24,8 +24,9 @@ import org.apache.nifi.components.ValidationContext;
public interface ValidationContextFactory { public interface ValidationContextFactory {
ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData, String groupId); ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData, String groupId, String componentId);
ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData, String groupId); ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties,
String annotationData, String groupId, String componentId);
} }

View File

@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
/** /**
@ -176,4 +177,25 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
* @param serviceNode the node * @param serviceNode the node
*/ */
Set<ConfiguredComponent> scheduleReferencingComponents(ControllerServiceNode serviceNode); Set<ConfiguredComponent> scheduleReferencingComponents(ControllerServiceNode serviceNode);
/**
*
* @param serviceType type of service to get identifiers for
* @param groupId the ID of the Process Group to look in for Controller Services
*
* @return the set of all Controller Service Identifiers whose Controller
* Service is of the given type.
* @throws IllegalArgumentException if the given class is not an interface
*/
Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId) throws IllegalArgumentException;
/**
* @param serviceIdentifier the identifier of the controller service
* @param componentId the identifier of the component that is referencing the service.
* @return the Controller Service that is registered with the given identifier or <code>null</code> if that
* identifier does not exist for any controller service or if the controller service with that identifier is
* not accessible from the component with the given componentId, or if no component exists with the given
* identifier
*/
ControllerService getControllerServiceForComponent(String serviceIdentifier, String componentId);
} }

View File

@ -278,6 +278,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final StateManagerProvider stateManagerProvider; private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
private volatile ZooKeeperStateServer zooKeeperStateServer; private volatile ZooKeeperStateServer zooKeeperStateServer;
@ -501,8 +502,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
instanceId = UUID.randomUUID().toString(); instanceId = UUID.randomUUID().toString();
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider);
controllerServiceProvider.setRootProcessGroup(rootGroup);
if (remoteInputSocketPort == null) { if (remoteInputSocketPort == null) {
LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set");
@ -521,7 +521,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
externalSiteListeners.add(HttpRemoteSiteListener.getInstance()); externalSiteListeners.add(HttpRemoteSiteListener.getInstance());
} }
for(RemoteSiteListener listener : externalSiteListeners) { for(final RemoteSiteListener listener : externalSiteListeners) {
listener.setRootGroup(rootGroup); listener.setRootGroup(rootGroup);
} }
@ -659,7 +659,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// ContentRepository to purge superfluous files // ContentRepository to purge superfluous files
contentRepository.cleanup(); contentRepository.cleanup();
for(RemoteSiteListener listener : externalSiteListeners) { for(final RemoteSiteListener listener : externalSiteListeners) {
listener.start(); listener.start();
} }
@ -1297,7 +1297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
+ "will take an indeterminate amount of time to stop. Might need to kill the program manually."); + "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
} }
for(RemoteSiteListener listener : externalSiteListeners) { for(final RemoteSiteListener listener : externalSiteListeners) {
listener.stop(); listener.stop();
} }
@ -1442,12 +1442,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try { try {
rootGroup = group; rootGroup = group;
for(RemoteSiteListener listener : externalSiteListeners) { for(final RemoteSiteListener listener : externalSiteListeners) {
listener.setRootGroup(rootGroup); listener.setRootGroup(rootGroup);
} }
controllerServiceProvider.setRootProcessGroup(rootGroup);
// update the heartbeat bean // update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus)); this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
} finally { } finally {
@ -2874,11 +2872,68 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return controllerServiceProvider.getControllerService(serviceIdentifier); return controllerServiceProvider.getControllerService(serviceIdentifier);
} }
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, componentId);
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
}
@Override @Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
return controllerServiceProvider.getControllerServiceNode(serviceIdentifier); return controllerServiceProvider.getControllerServiceNode(serviceIdentifier);
} }
public Set<ControllerServiceNode> getRootControllerServices() {
return new HashSet<>(rootControllerServices.values());
}
public void addRootControllerService(final ControllerServiceNode serviceNode) {
final ControllerServiceNode existing = rootControllerServices.putIfAbsent(serviceNode.getIdentifier(), serviceNode);
if (existing != null) {
throw new IllegalStateException("Controller Service with ID " + serviceNode.getIdentifier() + " already exists at the Controller level");
}
}
public ControllerServiceNode getRootControllerService(final String serviceIdentifier) {
return rootControllerServices.get(serviceIdentifier);
}
public void removeRootControllerService(final ControllerServiceNode service) {
final ControllerServiceNode existing = rootControllerServices.get(requireNonNull(service).getIdentifier());
if (existing == null) {
throw new IllegalStateException(service + " is not a member of this Process Group");
}
service.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
}
for (final Map.Entry<PropertyDescriptor, String> entry : service.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null) {
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
if (value != null) {
final ControllerServiceNode referencedNode = getRootControllerService(value);
if (referencedNode != null) {
referencedNode.removeReference(service);
}
}
}
}
rootControllerServices.remove(service.getIdentifier());
getStateManagerProvider().onComponentRemoved(service.getIdentifier());
LOG.info("{} removed from Flow Controller", service, this);
}
@Override @Override
public boolean isControllerServiceEnabled(final ControllerService service) { public boolean isControllerServiceEnabled(final ControllerService service) {
return controllerServiceProvider.isControllerServiceEnabled(service); return controllerServiceProvider.isControllerServiceEnabled(service);
@ -3844,7 +3899,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId);
} }

View File

@ -314,7 +314,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
} }
} }
void scaleRootGroup(ProcessGroup rootGroup, FlowEncodingVersion encodingVersion) { void scaleRootGroup(final ProcessGroup rootGroup, final FlowEncodingVersion encodingVersion) {
if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) { if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) {
// Calculate new Positions if the encoding version of the flow is older than 1.0. // Calculate new Positions if the encoding version of the flow is older than 1.0.
PositionScaler.scale(rootGroup, 1.5, 1.34); PositionScaler.scale(rootGroup, 1.5, 1.34);
@ -939,7 +939,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
remoteGroup.setYieldDuration(remoteGroupDto.getYieldDuration()); remoteGroup.setYieldDuration(remoteGroupDto.getYieldDuration());
} }
String transportProtocol = remoteGroupDto.getTransportProtocol(); final String transportProtocol = remoteGroupDto.getTransportProtocol();
if (transportProtocol != null && !transportProtocol.trim().isEmpty()) { if (transportProtocol != null && !transportProtocol.trim().isEmpty()) {
remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase())); remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase()));
} }

View File

@ -233,7 +233,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
@Override @Override
public void setPosition(Position position) { public void setPosition(final Position position) {
this.position.set(position); this.position.set(position);
} }
@ -373,7 +373,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public String getProcessorDescription() { public String getProcessorDescription() {
CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class);
String description = null; String description = null;
if (capDesc != null) { if (capDesc != null) {
description = capDesc.value(); description = capDesc.value();
@ -644,7 +644,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override @Override
public Set<Connection> getConnections(final Relationship relationship) { public Set<Connection> getConnections(final Relationship relationship) {
Set<Connection> applicableConnections = connections.get(relationship); final Set<Connection> applicableConnections = connections.get(relationship);
return (applicableConnections == null) ? Collections.<Connection> emptySet() return (applicableConnections == null) ? Collections.<Connection> emptySet()
: Collections.unmodifiableSet(applicableConnections); : Collections.unmodifiableSet(applicableConnections);
} }
@ -923,7 +923,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public boolean isValid() { public boolean isValid() {
try { try {
final ValidationContext validationContext = this.getValidationContextFactory() final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier()); .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults; final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@ -970,7 +970,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final List<ValidationResult> results = new ArrayList<>(); final List<ValidationResult> results = new ArrayList<>();
try { try {
final ValidationContext validationContext = this.getValidationContextFactory() final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier()); .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults; final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@ -1286,7 +1286,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}; };
taskScheduler.execute(startProcRunnable); taskScheduler.execute(startProcRunnable);
} else { } else {
String procName = this.processor.getClass().getSimpleName(); final String procName = this.processor.getClass().getSimpleName();
LOG.warn("Can not start '" + procName LOG.warn("Can not start '" + procName
+ "' since it's already in the process of being started or it is DISABLED - " + "' since it's already in the process of being started or it is DISABLED - "
+ scheduledState.get()); + scheduledState.get());
@ -1345,7 +1345,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} else { } else {
scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
} }
} catch (Exception e) { } catch (final Exception e) {
LOG.warn("Failed while shutting down processor " + processor, e); LOG.warn("Failed while shutting down processor " + processor, e);
} }
} }
@ -1389,19 +1389,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
* be logged (WARN) informing a user so further actions could be taken. * be logged (WARN) informing a user so further actions could be taken.
* </p> * </p>
*/ */
private <T> void invokeTaskAsCancelableFuture(SchedulingAgentCallback callback, Callable<T> task) { private <T> void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable<T> task) {
String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); final String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
long onScheduleTimeout = timeoutString == null ? 60000 final long onScheduleTimeout = timeoutString == null ? 60000
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
Future<?> taskFuture = callback.invokeMonitoringTask(task); final Future<?> taskFuture = callback.invokeMonitoringTask(task);
try { try {
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (final InterruptedException e) {
LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName()
+ "' lifecycle OnScheduled operation to finish."); + "' lifecycle OnScheduled operation to finish.");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e); throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e);
} catch (TimeoutException e) { } catch (final TimeoutException e) {
taskFuture.cancel(true); taskFuture.cancel(true);
LOG.warn("Timed out while waiting for OnScheduled of '" LOG.warn("Timed out while waiting for OnScheduled of '"
+ this.processor.getClass().getSimpleName() + this.processor.getClass().getSimpleName()
@ -1411,7 +1411,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
+ "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '"
+ this.processor + "' that needs to be documented, reported and eventually fixed."); + this.processor + "' that needs to be documented, reported and eventually fixed.");
throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e); throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e);
} catch (ExecutionException e){ } catch (final ExecutionException e){
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
} finally { } finally {
callback.postMonitor(); callback.postMonitor();

View File

@ -115,8 +115,8 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); return serviceProvider.getControllerServiceIdentifiers(serviceType, null);
} }
@Override @Override

View File

@ -75,8 +75,8 @@ public class StandardReportingInitializationContext implements ReportingInitiali
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); return serviceProvider.getControllerServiceIdentifiers(serviceType, null);
} }
@Override @Override

View File

@ -94,8 +94,12 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup"); addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
// Add root-level controller services
final Element controllerServicesNode = doc.createElement("controllerServices"); final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode); rootNode.appendChild(controllerServicesNode);
for (final ControllerServiceNode serviceNode : controller.getRootControllerServices()) {
addControllerService(controllerServicesNode, serviceNode);
}
final Element reportingTasksNode = doc.createElement("reportingTasks"); final Element reportingTasksNode = doc.createElement("reportingTasks");
rootNode.appendChild(reportingTasksNode); rootNode.appendChild(reportingTasksNode);
@ -252,7 +256,7 @@ public class StandardFlowSerializer implements FlowSerializer {
} }
addTextElement(element, "proxyUser", remoteRef.getProxyUser()); addTextElement(element, "proxyUser", remoteRef.getProxyUser());
if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) { if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) {
String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX; final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX;
addTextElement(element, "proxyPassword", value); addTextElement(element, "proxyPassword", value);
} }

View File

@ -31,6 +31,7 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
@ -48,7 +49,7 @@ public class ControllerServiceLoader {
private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final ProcessGroup parentGroup, public static List<ControllerServiceNode> loadControllerServices(final FlowController controller, final InputStream serializedStream, final ProcessGroup parentGroup,
final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException { final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException {
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
@ -90,19 +91,21 @@ public class ControllerServiceLoader {
final Document document = builder.parse(in); final Document document = builder.parse(in);
final Element controllerServices = document.getDocumentElement(); final Element controllerServices = document.getDocumentElement();
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
return new ArrayList<>(loadControllerServices(serviceElements, provider, parentGroup, encryptor, bulletinRepo, autoResumeState)); return new ArrayList<>(loadControllerServices(serviceElements, controller, parentGroup, encryptor, bulletinRepo, autoResumeState));
} catch (SAXException | ParserConfigurationException sxe) { } catch (SAXException | ParserConfigurationException sxe) {
throw new IOException(sxe); throw new IOException(sxe);
} }
} }
public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final ProcessGroup parentGroup, public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final FlowController controller, final ProcessGroup parentGroup,
final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>(); final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
for (final Element serviceElement : serviceElements) { for (final Element serviceElement : serviceElements) {
final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor); final ControllerServiceNode serviceNode = createControllerService(controller, serviceElement, encryptor);
if (parentGroup != null) { if (parentGroup == null) {
controller.addRootControllerService(serviceNode);
} else {
parentGroup.addControllerService(serviceNode); parentGroup.addControllerService(serviceNode);
} }
@ -132,7 +135,7 @@ public class ControllerServiceLoader {
} }
} }
provider.enableControllerServices(nodesToEnable); controller.enableControllerServices(nodesToEnable);
} }
return nodeMap.keySet(); return nodeMap.keySet();

View File

@ -49,8 +49,8 @@ public class StandardControllerServiceInitializationContext implements Controlle
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId); return serviceProvider.getControllerServiceIdentifiers(serviceType);
} }
@Override @Override
@ -69,7 +69,7 @@ public class StandardControllerServiceInitializationContext implements Controlle
} }
@Override @Override
public boolean isControllerServiceEnabling(String serviceIdentifier) { public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return serviceProvider.isControllerServiceEnabling(serviceIdentifier); return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
} }

View File

@ -72,7 +72,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private static final Set<Method> validDisabledMethods; private static final Set<Method> validDisabledMethods;
private final BulletinRepository bulletinRepo; private final BulletinRepository bulletinRepo;
private final StateManagerProvider stateManagerProvider; private final StateManagerProvider stateManagerProvider;
private volatile ProcessGroup rootGroup; private final FlowController flowController;
static { static {
// methods that are okay to be called when the service is disabled. // methods that are okay to be called when the service is disabled.
@ -86,17 +86,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
validDisabledMethods = Collections.unmodifiableSet(validMethods); validDisabledMethods = Collections.unmodifiableSet(validMethods);
} }
public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider) { public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
// the following 2 maps must be updated atomically, but we do not lock around them because they are modified final StateManagerProvider stateManagerProvider) {
// only in the createControllerService method, and both are modified before the method returns
this.flowController = flowController;
this.processScheduler = scheduler; this.processScheduler = scheduler;
this.bulletinRepo = bulletinRepo; this.bulletinRepo = bulletinRepo;
this.stateManagerProvider = stateManagerProvider; this.stateManagerProvider = stateManagerProvider;
} }
public void setRootProcessGroup(ProcessGroup rootGroup) {
this.rootGroup = rootGroup;
}
private Class<?>[] getInterfaces(final Class<?> cls) { private Class<?>[] getInterfaces(final Class<?> cls) {
final List<Class<?>> allIfcs = new ArrayList<>(); final List<Class<?>> allIfcs = new ArrayList<>();
@ -519,6 +517,52 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return node == null ? null : node.getProxiedControllerService(); return node == null ? null : node.getProxiedControllerService();
} }
private ProcessGroup getRootGroup() {
return flowController.getGroup(flowController.getRootGroupId());
}
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
final ProcessGroup rootGroup = getRootGroup();
// Find the Process Group that owns the component.
ProcessGroup groupOfInterest = null;
final ProcessorNode procNode = rootGroup.findProcessor(componentId);
if (procNode == null) {
final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
if (serviceNode == null) {
final ReportingTaskNode taskNode = flowController.getReportingTaskNode(componentId);
if (taskNode == null) {
throw new IllegalStateException("Could not find any Processor, Reporting Task, or Controller Service with identifier " + componentId);
}
// we have confirmed that the component is a reporting task. We can only reference Controller Services
// that are scoped at the FlowController level in this case.
final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
} else {
groupOfInterest = serviceNode.getProcessGroup();
}
} else {
groupOfInterest = procNode.getProcessGroup();
}
if (groupOfInterest == null) {
final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
for (final ControllerServiceNode serviceNode : servicesForGroup) {
if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
return serviceNode.getProxiedControllerService();
}
}
return null;
}
@Override @Override
public boolean isControllerServiceEnabled(final ControllerService service) { public boolean isControllerServiceEnabled(final ControllerService service) {
return isControllerServiceEnabled(service.getIdentifier()); return isControllerServiceEnabled(service.getIdentifier());
@ -538,17 +582,22 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override @Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
final ProcessGroup group = rootGroup; final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
return group == null ? null : group.findControllerService(serviceIdentifier); if (rootServiceNode != null) {
return rootServiceNode;
} }
return getRootGroup().findControllerService(serviceIdentifier);
}
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
ProcessGroup group = rootGroup; final Set<ControllerServiceNode> serviceNodes;
if (group == null) { if (groupId == null) {
return Collections.emptySet(); serviceNodes = flowController.getRootControllerServices();
} } else {
ProcessGroup group = getRootGroup();
if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) { if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
group = group.findProcessGroup(groupId); group = group.findProcessGroup(groupId);
} }
@ -557,7 +606,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return Collections.emptySet(); return Collections.emptySet();
} }
final Set<ControllerServiceNode> serviceNodes = group.getControllerServices(true); serviceNodes = group.getControllerServices(true);
}
final Set<String> identifiers = new HashSet<>(); final Set<String> identifiers = new HashSet<>();
for (final ControllerServiceNode serviceNode : serviceNodes) { for (final ControllerServiceNode serviceNode : serviceNodes) {
@ -579,7 +629,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public void removeControllerService(final ControllerServiceNode serviceNode) { public void removeControllerService(final ControllerServiceNode serviceNode) {
final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup(); final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup();
if (group == null) { if (group == null) {
throw new IllegalArgumentException("Cannot remote Controller Service " + serviceNode + " because it does not belong to any Process Group"); flowController.removeRootControllerService(serviceNode);
return;
} }
group.removeControllerService(serviceNode); group.removeControllerService(serviceNode);
@ -587,12 +638,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override @Override
public Set<ControllerServiceNode> getAllControllerServices() { public Set<ControllerServiceNode> getAllControllerServices() {
final ProcessGroup group = rootGroup; final Set<ControllerServiceNode> allServices = new HashSet<>();
if (group == null) { allServices.addAll(flowController.getRootControllerServices());
return Collections.emptySet(); allServices.addAll(getRootGroup().findAllControllerServices());
}
return group.findAllControllerServices(); return allServices;
} }
/** /**
@ -709,4 +759,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) { public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
// we can always stop referencing components // we can always stop referencing components
} }
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier");
}
} }

View File

@ -186,7 +186,7 @@ public class StandardStateManagerProvider implements StateManagerProvider {
provider.initialize(initContext); provider.initialize(initContext);
} }
final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null); final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null, null);
final Collection<ValidationResult> results = provider.validate(validationContext); final Collection<ValidationResult> results = provider.validate(validationContext);
final StringBuilder validationFailures = new StringBuilder(); final StringBuilder validationFailures = new StringBuilder();

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor;
import java.util.Set;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.service.ControllerServiceProvider;
public class ComponentSpecificControllerServiceLookup implements ControllerServiceLookup {
private final ControllerServiceProvider serviceProvider;
private final String componentId;
private final String groupId;
public ComponentSpecificControllerServiceLookup(final ControllerServiceProvider serviceProvider, final String componentId, final String groupId) {
this.serviceProvider = serviceProvider;
this.componentId = componentId;
this.groupId = groupId;
}
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
return serviceProvider.getControllerServiceForComponent(serviceIdentifier, componentId);
}
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
return serviceProvider.isControllerServiceEnabled(serviceIdentifier);
}
@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
}
@Override
public boolean isControllerServiceEnabled(final ControllerService service) {
return serviceProvider.isControllerServiceEnabled(service);
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override
public String getControllerServiceName(final String serviceIdentifier) {
return serviceProvider.getControllerServiceName(serviceIdentifier);
}
}

View File

@ -101,7 +101,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService
@Override @Override
public ControllerService getControllerService(final String serviceIdentifier) { public ControllerService getControllerService(final String serviceIdentifier) {
return controllerServiceProvider.getControllerService(serviceIdentifier); return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, procNode.getIdentifier());
} }
@Override @Override
@ -130,11 +130,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
if (!serviceType.isInterface()) { if (!serviceType.isInterface()) {
throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface"); throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface");
} }
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, procNode.getProcessGroup().getIdentifier());
} }
@Override @Override
@ -197,8 +197,8 @@ public class StandardProcessContext implements ProcessContext, ControllerService
} }
@Override @Override
public boolean hasConnection(Relationship relationship) { public boolean hasConnection(final Relationship relationship) {
Set<Connection> connections = procNode.getConnections(relationship); final Set<Connection> connections = procNode.getConnections(relationship);
return connections != null && !connections.isEmpty(); return connections != null && !connections.isEmpty();
} }

View File

@ -45,9 +45,11 @@ public class StandardValidationContext implements ValidationContext {
private final String annotationData; private final String annotationData;
private final Set<String> serviceIdentifiersToNotValidate; private final Set<String> serviceIdentifiersToNotValidate;
private final String groupId; private final String groupId;
private final String componentId;
public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId) { public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties,
this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId); final String annotationData, final String groupId, final String componentId) {
this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId);
} }
public StandardValidationContext( public StandardValidationContext(
@ -55,12 +57,14 @@ public class StandardValidationContext implements ValidationContext {
final Set<String> serviceIdentifiersToNotValidate, final Set<String> serviceIdentifiersToNotValidate,
final Map<PropertyDescriptor, String> properties, final Map<PropertyDescriptor, String> properties,
final String annotationData, final String annotationData,
final String groupId) { final String groupId,
final String componentId) {
this.controllerServiceProvider = controllerServiceProvider; this.controllerServiceProvider = controllerServiceProvider;
this.properties = new HashMap<>(properties); this.properties = new HashMap<>(properties);
this.annotationData = annotationData; this.annotationData = annotationData;
this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate; this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate;
this.groupId = groupId; this.groupId = groupId;
this.componentId = componentId;
preparedQueries = new HashMap<>(properties.size()); preparedQueries = new HashMap<>(properties.size());
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
@ -93,7 +97,8 @@ public class StandardValidationContext implements ValidationContext {
@Override @Override
public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) { public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) {
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(controllerService.getIdentifier()); final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(controllerService.getIdentifier());
return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), serviceNode.getProcessGroup().getIdentifier()); return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(),
serviceNode.getProcessGroup().getIdentifier(), serviceNode.getIdentifier());
} }
@Override @Override
@ -114,7 +119,7 @@ public class StandardValidationContext implements ValidationContext {
@Override @Override
public ControllerServiceLookup getControllerServiceLookup() { public ControllerServiceLookup getControllerServiceLookup() {
return controllerServiceProvider; return new ComponentSpecificControllerServiceLookup(controllerServiceProvider, componentId, groupId);
} }
@Override @Override

View File

@ -33,13 +33,13 @@ public class StandardValidationContextFactory implements ValidationContextFactor
} }
@Override @Override
public ValidationContext newValidationContext(final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId) { public ValidationContext newValidationContext(final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId, final String componentId) {
return new StandardValidationContext(serviceProvider, properties, annotationData, groupId); return new StandardValidationContext(serviceProvider, properties, annotationData, groupId, componentId);
} }
@Override @Override
public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate, public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate,
final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId) { final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId, String componentId) {
return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId); return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId, componentId);
} }
} }

View File

@ -38,11 +38,13 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode; import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
@ -73,7 +75,9 @@ public class TestStandardProcessScheduler {
private StandardProcessScheduler scheduler = null; private StandardProcessScheduler scheduler = null;
private ReportingTaskNode taskNode = null; private ReportingTaskNode taskNode = null;
private TestReportingTask reportingTask = null; private TestReportingTask reportingTask = null;
private StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class); private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class);
private FlowController controller;
private ProcessGroup rootGroup;
@Before @Before
public void setup() throws InitializationException { public void setup() throws InitializationException {
@ -89,6 +93,10 @@ public class TestStandardProcessScheduler {
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null);
taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory); taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory);
controller = Mockito.mock(FlowController.class);
rootGroup = new MockProcessGroup();
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup);
} }
/** /**
@ -115,20 +123,17 @@ public class TestStandardProcessScheduler {
assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1); assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1);
} }
@Test(timeout = 6000) @Test(timeout = 60000)
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException { public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
final Processor proc = new ServiceReferencingProcessor(); final Processor proc = new ServiceReferencingProcessor();
final ProcessGroup group = new MockProcessGroup();
final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class));
serviceProvider.setRootProcessGroup(group);
final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class));
final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true);
group.addControllerService(service); rootGroup.addControllerService(service);
final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(), final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider);
group.addProcessor(procNode); rootGroup.addProcessor(procNode);
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier());
@ -184,16 +189,16 @@ public class TestStandardProcessScheduler {
} }
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
} }
} }
private void refreshNiFiProperties() { private void refreshNiFiProperties() {
try { try {
Field instanceField = NiFiProperties.class.getDeclaredField("instance"); final Field instanceField = NiFiProperties.class.getDeclaredField("instance");
instanceField.setAccessible(true); instanceField.setAccessible(true);
instanceField.set(null, null); instanceField.set(null, null);
} catch (Exception e) { } catch (final Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
@ -206,12 +211,12 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception { public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false); "1", false);
assertFalse(serviceNode.isActive()); assertFalse(serviceNode.isActive());
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean(); final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -221,7 +226,7 @@ public class TestStandardProcessScheduler {
try { try {
scheduler.enableControllerService(serviceNode); scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive()); assertTrue(serviceNode.isActive());
} catch (Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
asyncFailed.set(true); asyncFailed.set(true);
} }
@ -245,11 +250,11 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateDisabledServiceCantBeDisabled() throws Exception { public void validateDisabledServiceCantBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false); "1", false);
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean(); final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -259,7 +264,7 @@ public class TestStandardProcessScheduler {
try { try {
scheduler.disableControllerService(serviceNode); scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive()); assertFalse(serviceNode.isActive());
} catch (Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
asyncFailed.set(true); asyncFailed.set(true);
} }
@ -283,13 +288,13 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception { public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false); "1", false);
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
scheduler.enableControllerService(serviceNode); scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive()); assertTrue(serviceNode.isActive());
ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean(); final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -299,7 +304,7 @@ public class TestStandardProcessScheduler {
try { try {
scheduler.disableControllerService(serviceNode); scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive()); assertFalse(serviceNode.isActive());
} catch (Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
asyncFailed.set(true); asyncFailed.set(true);
} }
@ -317,7 +322,7 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateDisablingOfTheFailedService() throws Exception { public void validateDisablingOfTheFailedService() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
"1", false); "1", false);
scheduler.enableControllerService(serviceNode); scheduler.enableControllerService(serviceNode);
@ -348,8 +353,8 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateEnabledDisableMultiThread() throws Exception { public void validateEnabledDisableMultiThread() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 200; i++) { for (int i = 0; i < 200; i++) {
final ControllerServiceNode serviceNode = provider final ControllerServiceNode serviceNode = provider
.createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false); .createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false);
@ -391,10 +396,10 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception { public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", false); "1", false);
LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE); ts.setLimit(Long.MAX_VALUE);
scheduler.enableControllerService(serviceNode); scheduler.enableControllerService(serviceNode);
Thread.sleep(100); Thread.sleep(100);
@ -416,10 +421,10 @@ public class TestStandardProcessScheduler {
@Test @Test
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", false); "1", false);
LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(3000); ts.setLimit(3000);
scheduler.enableControllerService(serviceNode); scheduler.enableControllerService(serviceNode);
Thread.sleep(2000); Thread.sleep(2000);
@ -440,7 +445,7 @@ public class TestStandardProcessScheduler {
public static class FailingService extends AbstractControllerService { public static class FailingService extends AbstractControllerService {
@OnEnabled @OnEnabled
public void enable(ConfigurationContext context) { public void enable(final ConfigurationContext context) {
throw new RuntimeException("intentional"); throw new RuntimeException("intentional");
} }
} }
@ -449,10 +454,10 @@ public class TestStandardProcessScheduler {
private final Random random = new Random(); private final Random random = new Random();
@OnEnabled @OnEnabled
public void enable(ConfigurationContext context) { public void enable(final ConfigurationContext context) {
try { try {
Thread.sleep(random.nextInt(20)); Thread.sleep(random.nextInt(20));
} catch (InterruptedException e) { } catch (final InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
@ -464,12 +469,12 @@ public class TestStandardProcessScheduler {
private final AtomicInteger disableCounter = new AtomicInteger(); private final AtomicInteger disableCounter = new AtomicInteger();
@OnEnabled @OnEnabled
public void enable(ConfigurationContext context) { public void enable(final ConfigurationContext context) {
this.enableCounter.incrementAndGet(); this.enableCounter.incrementAndGet();
} }
@OnDisabled @OnDisabled
public void disable(ConfigurationContext context) { public void disable(final ConfigurationContext context) {
this.disableCounter.incrementAndGet(); this.disableCounter.incrementAndGet();
} }
@ -489,13 +494,13 @@ public class TestStandardProcessScheduler {
private volatile long limit; private volatile long limit;
@OnEnabled @OnEnabled
public void enable(ConfigurationContext context) throws Exception { public void enable(final ConfigurationContext context) throws Exception {
this.enableCounter.incrementAndGet(); this.enableCounter.incrementAndGet();
Thread.sleep(limit); Thread.sleep(limit);
} }
@OnDisabled @OnDisabled
public void disable(ConfigurationContext context) { public void disable(final ConfigurationContext context) {
this.disableCounter.incrementAndGet(); this.disableCounter.incrementAndGet();
} }
@ -507,7 +512,7 @@ public class TestStandardProcessScheduler {
return this.disableCounter.get(); return this.disableCounter.get();
} }
public void setLimit(long limit) { public void setLimit(final long limit) {
this.limit = limit; this.limit = limit;
} }
} }

View File

@ -46,7 +46,7 @@ public class StandardControllerServiceProviderTest {
public void setup() throws Exception { public void setup() throws Exception {
String id = "id"; String id = "id";
String clazz = "org.apache.nifi.controller.service.util.TestControllerService"; String clazz = "org.apache.nifi.controller.service.util.TestControllerService";
ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, new StateManagerProvider() { ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, null, new StateManagerProvider() {
@Override @Override
public StateManager getStateManager(final String componentId) { public StateManager getStateManager(final String componentId) {
return Mockito.mock(StateManager.class); return Mockito.mock(StateManager.class);

View File

@ -29,6 +29,7 @@ import java.util.UUID;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
@ -49,7 +50,7 @@ import org.mockito.Mockito;
public class TestStandardControllerServiceProvider { public class TestStandardControllerServiceProvider {
private static StateManagerProvider stateManagerProvider = new StateManagerProvider() { private static StateManagerProvider stateManagerProvider = new StateManagerProvider() {
@Override @Override
public StateManager getStateManager(String componentId) { public StateManager getStateManager(final String componentId) {
return Mockito.mock(StateManager.class); return Mockito.mock(StateManager.class);
} }
@ -66,7 +67,7 @@ public class TestStandardControllerServiceProvider {
} }
@Override @Override
public void onComponentRemoved(String componentId) { public void onComponentRemoved(final String componentId) {
} }
}; };
@ -81,20 +82,26 @@ public class TestStandardControllerServiceProvider {
@Test @Test
public void testDisableControllerService() { public void testDisableControllerService() {
final ProcessGroup procGroup = new MockProcessGroup();
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
final ProcessScheduler scheduler = createScheduler(); final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false);
provider.enableControllerService(serviceNode); provider.enableControllerService(serviceNode);
provider.disableControllerService(serviceNode); provider.disableControllerService(serviceNode);
} }
@Test(timeout = 1000000) @Test(timeout = 10000)
public void testEnableDisableWithReference() { public void testEnableDisableWithReference() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
final ProcessGroup group = new MockProcessGroup(); final ProcessGroup group = new MockProcessGroup();
provider.setRootProcessGroup(group); final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false); final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
@ -148,10 +155,12 @@ public class TestStandardControllerServiceProvider {
} }
} }
public void testEnableReferencingServicesGraph(ProcessScheduler scheduler) { public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) {
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
final ProcessGroup procGroup = new MockProcessGroup(); final ProcessGroup procGroup = new MockProcessGroup();
provider.setRootProcessGroup(procGroup); final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
// build a graph of controller services with dependencies as such: // build a graph of controller services with dependencies as such:
// //
@ -199,7 +208,11 @@ public class TestStandardControllerServiceProvider {
@Test @Test
public void testOrderingOfServices() { public void testOrderingOfServices() {
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); final ProcessGroup procGroup = new MockProcessGroup();
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, null, null, stateManagerProvider);
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false);
@ -354,8 +367,12 @@ public class TestStandardControllerServiceProvider {
@Test @Test
public void testEnableReferencingComponents() { public void testEnableReferencingComponents() {
final ProcessGroup procGroup = new MockProcessGroup();
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
final StandardProcessScheduler scheduler = createScheduler(); final StandardProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, null, null, stateManagerProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false);
final ProcessorNode procNode = createProcessor(scheduler, provider); final ProcessorNode procNode = createProcessor(scheduler, provider);

View File

@ -17,6 +17,13 @@
package org.apache.nifi.controller.service.mock; package org.apache.nifi.controller.service.mock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
@ -34,14 +41,9 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MockProcessGroup implements ProcessGroup { public class MockProcessGroup implements ProcessGroup {
private Map<String, ControllerServiceNode> serviceMap = new HashMap<>(); private final Map<String, ControllerServiceNode> serviceMap = new HashMap<>();
private final Map<String, ProcessorNode> processorMap = new HashMap<>();
@Override @Override
public Authorizable getParentAuthorizable() { public Authorizable getParentAuthorizable() {
@ -59,7 +61,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void setParent(ProcessGroup group) { public void setParent(final ProcessGroup group) {
} }
@ -74,12 +76,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void setName(String name) { public void setName(final String name) {
} }
@Override @Override
public void setPosition(Position position) { public void setPosition(final Position position) {
} }
@ -94,7 +96,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void setComments(String comments) { public void setComments(final String comments) {
} }
@ -114,67 +116,67 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void enableProcessor(ProcessorNode processor) { public void enableProcessor(final ProcessorNode processor) {
} }
@Override @Override
public void enableInputPort(Port port) { public void enableInputPort(final Port port) {
} }
@Override @Override
public void enableOutputPort(Port port) { public void enableOutputPort(final Port port) {
} }
@Override @Override
public void startProcessor(ProcessorNode processor) { public void startProcessor(final ProcessorNode processor) {
} }
@Override @Override
public void startInputPort(Port port) { public void startInputPort(final Port port) {
} }
@Override @Override
public void startOutputPort(Port port) { public void startOutputPort(final Port port) {
} }
@Override @Override
public void startFunnel(Funnel funnel) { public void startFunnel(final Funnel funnel) {
} }
@Override @Override
public void stopProcessor(ProcessorNode processor) { public void stopProcessor(final ProcessorNode processor) {
} }
@Override @Override
public void stopInputPort(Port port) { public void stopInputPort(final Port port) {
} }
@Override @Override
public void stopOutputPort(Port port) { public void stopOutputPort(final Port port) {
} }
@Override @Override
public void disableProcessor(ProcessorNode processor) { public void disableProcessor(final ProcessorNode processor) {
} }
@Override @Override
public void disableInputPort(Port port) { public void disableInputPort(final Port port) {
} }
@Override @Override
public void disableOutputPort(Port port) { public void disableOutputPort(final Port port) {
} }
@ -189,12 +191,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void addInputPort(Port port) { public void addInputPort(final Port port) {
} }
@Override @Override
public void removeInputPort(Port port) { public void removeInputPort(final Port port) {
} }
@ -204,22 +206,22 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Port getInputPort(String id) { public Port getInputPort(final String id) {
return null; return null;
} }
@Override @Override
public void addOutputPort(Port port) { public void addOutputPort(final Port port) {
} }
@Override @Override
public void removeOutputPort(Port port) { public void removeOutputPort(final Port port) {
} }
@Override @Override
public Port getOutputPort(String id) { public Port getOutputPort(final String id) {
return null; return null;
} }
@ -229,12 +231,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void addProcessGroup(ProcessGroup group) { public void addProcessGroup(final ProcessGroup group) {
} }
@Override @Override
public ProcessGroup getProcessGroup(String id) { public ProcessGroup getProcessGroup(final String id) {
return null; return null;
} }
@ -244,28 +246,29 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void removeProcessGroup(ProcessGroup group) { public void removeProcessGroup(final ProcessGroup group) {
} }
@Override @Override
public void addProcessor(ProcessorNode processor) { public void addProcessor(final ProcessorNode processor) {
processor.setProcessGroup(this); processor.setProcessGroup(this);
processorMap.put(processor.getIdentifier(), processor);
} }
@Override @Override
public void removeProcessor(ProcessorNode processor) { public void removeProcessor(final ProcessorNode processor) {
processorMap.remove(processor.getIdentifier());
} }
@Override @Override
public Set<ProcessorNode> getProcessors() { public Set<ProcessorNode> getProcessors() {
return null; return new HashSet<>(processorMap.values());
} }
@Override @Override
public ProcessorNode getProcessor(String id) { public ProcessorNode getProcessor(final String id) {
return null; return processorMap.get(id);
} }
@Override @Override
@ -274,27 +277,27 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Connectable getConnectable(String id) { public Connectable getConnectable(final String id) {
return null; return null;
} }
@Override @Override
public void addConnection(Connection connection) { public void addConnection(final Connection connection) {
} }
@Override @Override
public void removeConnection(Connection connection) { public void removeConnection(final Connection connection) {
} }
@Override @Override
public void inheritConnection(Connection connection) { public void inheritConnection(final Connection connection) {
} }
@Override @Override
public Connection getConnection(String id) { public Connection getConnection(final String id) {
return null; return null;
} }
@ -304,7 +307,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Connection findConnection(String id) { public Connection findConnection(final String id) {
return null; return null;
} }
@ -314,12 +317,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Funnel findFunnel(String id) { public Funnel findFunnel(final String id) {
return null; return null;
} }
@Override @Override
public ControllerServiceNode findControllerService(String id) { public ControllerServiceNode findControllerService(final String id) {
return serviceMap.get(id); return serviceMap.get(id);
} }
@ -329,17 +332,17 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void addRemoteProcessGroup(RemoteProcessGroup remoteGroup) { public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
} }
@Override @Override
public void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup) { public void removeRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
} }
@Override @Override
public RemoteProcessGroup getRemoteProcessGroup(String id) { public RemoteProcessGroup getRemoteProcessGroup(final String id) {
return null; return null;
} }
@ -349,12 +352,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void addLabel(Label label) { public void addLabel(final Label label) {
} }
@Override @Override
public void removeLabel(Label label) { public void removeLabel(final Label label) {
} }
@ -364,12 +367,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Label getLabel(String id) { public Label getLabel(final String id) {
return null; return null;
} }
@Override @Override
public ProcessGroup findProcessGroup(String id) { public ProcessGroup findProcessGroup(final String id) {
return null; return null;
} }
@ -379,7 +382,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public RemoteProcessGroup findRemoteProcessGroup(String id) { public RemoteProcessGroup findRemoteProcessGroup(final String id) {
return null; return null;
} }
@ -389,17 +392,17 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public ProcessorNode findProcessor(String id) { public ProcessorNode findProcessor(final String id) {
return null; return processorMap.get(id);
} }
@Override @Override
public List<ProcessorNode> findAllProcessors() { public List<ProcessorNode> findAllProcessors() {
return null; return new ArrayList<>(processorMap.values());
} }
@Override @Override
public Label findLabel(String id) { public Label findLabel(final String id) {
return null; return null;
} }
@ -409,7 +412,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Port findInputPort(String id) { public Port findInputPort(final String id) {
return null; return null;
} }
@ -419,12 +422,12 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Port getInputPortByName(String name) { public Port getInputPortByName(final String name) {
return null; return null;
} }
@Override @Override
public Port findOutputPort(String id) { public Port findOutputPort(final String id) {
return null; return null;
} }
@ -434,17 +437,17 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Port getOutputPortByName(String name) { public Port getOutputPortByName(final String name) {
return null; return null;
} }
@Override @Override
public void addFunnel(Funnel funnel) { public void addFunnel(final Funnel funnel) {
} }
@Override @Override
public void addFunnel(Funnel funnel, boolean autoStart) { public void addFunnel(final Funnel funnel, final boolean autoStart) {
} }
@ -454,33 +457,33 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public Funnel getFunnel(String id) { public Funnel getFunnel(final String id) {
return null; return null;
} }
@Override @Override
public void removeFunnel(Funnel funnel) { public void removeFunnel(final Funnel funnel) {
} }
@Override @Override
public void addControllerService(ControllerServiceNode service) { public void addControllerService(final ControllerServiceNode service) {
serviceMap.put(service.getIdentifier(), service); serviceMap.put(service.getIdentifier(), service);
service.setProcessGroup(this); service.setProcessGroup(this);
} }
@Override @Override
public ControllerServiceNode getControllerService(String id) { public ControllerServiceNode getControllerService(final String id) {
return serviceMap.get(id); return serviceMap.get(id);
} }
@Override @Override
public Set<ControllerServiceNode> getControllerServices(boolean recursive) { public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
return new HashSet<>(serviceMap.values()); return new HashSet<>(serviceMap.values());
} }
@Override @Override
public void removeControllerService(ControllerServiceNode service) { public void removeControllerService(final ControllerServiceNode service) {
serviceMap.remove(service.getIdentifier()); serviceMap.remove(service.getIdentifier());
} }
@ -490,17 +493,17 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void remove(Snippet snippet) { public void remove(final Snippet snippet) {
} }
@Override @Override
public Connectable findConnectable(String identifier) { public Connectable findConnectable(final String identifier) {
return null; return null;
} }
@Override @Override
public void move(Snippet snippet, ProcessGroup destination) { public void move(final Snippet snippet, final ProcessGroup destination) {
} }
@ -510,7 +513,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void verifyCanDelete(boolean ignorePortConnections) { public void verifyCanDelete(final boolean ignorePortConnections) {
} }
@ -525,31 +528,31 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void verifyCanDelete(Snippet snippet) { public void verifyCanDelete(final Snippet snippet) {
} }
@Override @Override
public void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup) { public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) {
} }
@Override @Override
public void addTemplate(Template template) { public void addTemplate(final Template template) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public void removeTemplate(Template template) { public void removeTemplate(final Template template) {
} }
@Override @Override
public Template getTemplate(String id) { public Template getTemplate(final String id) {
return null; return null;
} }
@Override @Override
public Template findTemplate(String id) { public Template findTemplate(final String id) {
return null; return null;
} }
@ -564,10 +567,10 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void verifyCanStart(Connectable connectable) { public void verifyCanStart(final Connectable connectable) {
} }
@Override @Override
public void verifyCanStop(Connectable connectable) { public void verifyCanStop(final Connectable connectable) {
} }
} }

View File

@ -148,27 +148,27 @@ public class TestStandardPropertyValue {
} }
@Override @Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) { public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return null; return null;
} }
@Override @Override
public boolean isControllerServiceEnabled(String serviceIdentifier) { public boolean isControllerServiceEnabled(final String serviceIdentifier) {
return true; return true;
} }
@Override @Override
public boolean isControllerServiceEnabled(ControllerService service) { public boolean isControllerServiceEnabled(final ControllerService service) {
return true; return true;
} }
@Override @Override
public String getControllerServiceName(String serviceIdentifier) { public String getControllerServiceName(final String serviceIdentifier) {
return null; return null;
} }
@Override @Override
public boolean isControllerServiceEnabling(String serviceIdentifier) { public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return false; return false;
} }
} }

View File

@ -16,22 +16,7 @@
*/ */
package org.apache.nifi.web; package org.apache.nifi.web;
import java.io.Serializable; import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action; import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component; import org.apache.nifi.action.Component;
@ -56,8 +41,8 @@ import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
@ -74,7 +59,20 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication; import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.context.SecurityContextHolder;
import com.sun.jersey.core.util.MultivaluedMapImpl; import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
/** /**
* Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments. * Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments.
@ -88,7 +86,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
private NiFiServiceFacade serviceFacade; private NiFiServiceFacade serviceFacade;
private ClusterCoordinator clusterCoordinator; private ClusterCoordinator clusterCoordinator;
private RequestReplicator requestReplicator; private RequestReplicator requestReplicator;
private ControllerServiceLookup controllerServiceLookup; private ControllerServiceProvider controllerServiceProvider;
private ReportingTaskProvider reportingTaskProvider; private ReportingTaskProvider reportingTaskProvider;
private AuditService auditService; private AuditService auditService;
private Authorizer authorizer; private Authorizer authorizer;
@ -113,10 +111,10 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
} }
@Override @Override
public ControllerService getControllerService(String serviceIdentifier) { public ControllerService getControllerService(final String serviceIdentifier, final String componentId) {
final NiFiUser user = NiFiUserUtils.getNiFiUser(); final NiFiUser user = NiFiUserUtils.getNiFiUser();
authorizeFlowAccess(user); authorizeFlowAccess(user);
return controllerServiceLookup.getControllerService(serviceIdentifier); return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, componentId);
} }
@Override @Override
@ -193,7 +191,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
try { try {
// record the operations // record the operations
auditService.addActions(actions); auditService.addActions(actions);
} catch (Throwable t) { } catch (final Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage()); logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t); logger.warn(StringUtils.EMPTY, t);
@ -329,21 +327,21 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// create the request URL // create the request URL
URI requestUrl; URI requestUrl;
try { try {
String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8"); final String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) { } catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use); throw new ClusterRequestException(use);
} }
// set the request parameters // set the request parameters
MultivaluedMap<String, String> parameters = new MultivaluedMapImpl(); final MultivaluedMap<String, String> parameters = new MultivaluedMapImpl();
parameters.add(VERBOSE_PARAM, "true"); parameters.add(VERBOSE_PARAM, "true");
// replicate request // replicate request
NodeResponse nodeResponse; NodeResponse nodeResponse;
try { try {
nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse(); nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
} catch (InterruptedException e) { } catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
} }
@ -381,28 +379,28 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// create the request URL // create the request URL
URI requestUrl; URI requestUrl;
try { try {
String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8"); final String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) { } catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use); throw new ClusterRequestException(use);
} }
// create the revision // create the revision
RevisionDTO revisionDto = new RevisionDTO(); final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(revision.getClientId()); revisionDto.setClientId(revision.getClientId());
revisionDto.setVersion(revision.getVersion()); revisionDto.setVersion(revision.getVersion());
// create the processor entity // create the processor entity
ProcessorEntity processorEntity = new ProcessorEntity(); final ProcessorEntity processorEntity = new ProcessorEntity();
processorEntity.setRevision(revisionDto); processorEntity.setRevision(revisionDto);
// create the processor dto // create the processor dto
ProcessorDTO processorDto = new ProcessorDTO(); final ProcessorDTO processorDto = new ProcessorDTO();
processorEntity.setComponent(processorDto); processorEntity.setComponent(processorDto);
processorDto.setId(id); processorDto.setId(id);
// create the processor configuration with the given annotation data // create the processor configuration with the given annotation data
ProcessorConfigDTO configDto = new ProcessorConfigDTO(); final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
processorDto.setConfig(configDto); processorDto.setConfig(configDto);
configDto.setAnnotationData(annotationData); configDto.setAnnotationData(annotationData);
@ -414,7 +412,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
NodeResponse nodeResponse; NodeResponse nodeResponse;
try { try {
nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, processorEntity, headers).awaitMergedResponse(); nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, processorEntity, headers).awaitMergedResponse();
} catch (InterruptedException e) { } catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
} }
@ -474,7 +472,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// if the lookup has the service that means we are either a node or // if the lookup has the service that means we are either a node or
// the ncm and the service is available there only // the ncm and the service is available there only
if (controllerServiceLookup.getControllerService(id) != null) { if (controllerServiceProvider.getControllerService(id) != null) {
controllerService = serviceFacade.getControllerService(id).getComponent(); controllerService = serviceFacade.getControllerService(id).getComponent();
} else { } else {
// if this is a standalone instance the service should have been found above... there should // if this is a standalone instance the service should have been found above... there should
@ -486,20 +484,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// create the request URL // create the request URL
URI requestUrl; URI requestUrl;
try { try {
String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8"); final String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) { } catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use); throw new ClusterRequestException(use);
} }
// set the request parameters // set the request parameters
MultivaluedMap<String, String> parameters = new MultivaluedMapImpl(); final MultivaluedMap<String, String> parameters = new MultivaluedMapImpl();
// replicate request // replicate request
NodeResponse nodeResponse; NodeResponse nodeResponse;
try { try {
nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse(); nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
} catch (InterruptedException e) { } catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
} }
@ -531,7 +529,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
}); });
final ControllerServiceDTO controllerService; final ControllerServiceDTO controllerService;
if (controllerServiceLookup.getControllerService(id) != null) { if (controllerServiceProvider.getControllerService(id) != null) {
final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO(); final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO();
controllerServiceDto.setId(id); controllerServiceDto.setId(id);
controllerServiceDto.setAnnotationData(annotationData); controllerServiceDto.setAnnotationData(annotationData);
@ -562,23 +560,23 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// create the request URL // create the request URL
URI requestUrl; URI requestUrl;
try { try {
String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8"); final String path = "/nifi-api/controller-services/node/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) { } catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use); throw new ClusterRequestException(use);
} }
// create the revision // create the revision
RevisionDTO revisionDto = new RevisionDTO(); final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(revision.getClientId()); revisionDto.setClientId(revision.getClientId());
revisionDto.setVersion(revision.getVersion()); revisionDto.setVersion(revision.getVersion());
// create the controller service entity // create the controller service entity
ControllerServiceEntity controllerServiceEntity = new ControllerServiceEntity(); final ControllerServiceEntity controllerServiceEntity = new ControllerServiceEntity();
controllerServiceEntity.setRevision(revisionDto); controllerServiceEntity.setRevision(revisionDto);
// create the controller service dto // create the controller service dto
ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO(); final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO();
controllerServiceEntity.setComponent(controllerServiceDto); controllerServiceEntity.setComponent(controllerServiceDto);
controllerServiceDto.setId(id); controllerServiceDto.setId(id);
controllerServiceDto.setAnnotationData(annotationData); controllerServiceDto.setAnnotationData(annotationData);
@ -591,7 +589,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
NodeResponse nodeResponse; NodeResponse nodeResponse;
try { try {
nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers).awaitMergedResponse(); nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers).awaitMergedResponse();
} catch (InterruptedException e) { } catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
} }
@ -652,20 +650,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// create the request URL // create the request URL
URI requestUrl; URI requestUrl;
try { try {
String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8"); final String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) { } catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use); throw new ClusterRequestException(use);
} }
// set the request parameters // set the request parameters
MultivaluedMap<String, String> parameters = new MultivaluedMapImpl(); final MultivaluedMap<String, String> parameters = new MultivaluedMapImpl();
// replicate request // replicate request
NodeResponse nodeResponse; NodeResponse nodeResponse;
try { try {
nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse(); nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
} catch (InterruptedException e) { } catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
} }
@ -727,23 +725,23 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// create the request URL // create the request URL
URI requestUrl; URI requestUrl;
try { try {
String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8"); final String path = "/nifi-api/reporting-tasks/node/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null); requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) { } catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use); throw new ClusterRequestException(use);
} }
// create the revision // create the revision
RevisionDTO revisionDto = new RevisionDTO(); final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(revision.getClientId()); revisionDto.setClientId(revision.getClientId());
revisionDto.setVersion(revision.getVersion()); revisionDto.setVersion(revision.getVersion());
// create the reporting task entity // create the reporting task entity
ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity(); final ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity();
reportingTaskEntity.setRevision(revisionDto); reportingTaskEntity.setRevision(revisionDto);
// create the reporting task dto // create the reporting task dto
ReportingTaskDTO reportingTaskDto = new ReportingTaskDTO(); final ReportingTaskDTO reportingTaskDto = new ReportingTaskDTO();
reportingTaskEntity.setComponent(reportingTaskDto); reportingTaskEntity.setComponent(reportingTaskDto);
reportingTaskDto.setId(id); reportingTaskDto.setId(id);
reportingTaskDto.setAnnotationData(annotationData); reportingTaskDto.setAnnotationData(annotationData);
@ -756,7 +754,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
NodeResponse nodeResponse; NodeResponse nodeResponse;
try { try {
nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers).awaitMergedResponse(); nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers).awaitMergedResponse();
} catch (InterruptedException e) { } catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node"); throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
} }
@ -831,35 +829,35 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
} }
} }
public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) { public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator; this.clusterCoordinator = clusterCoordinator;
} }
public void setRequestReplicator(RequestReplicator requestReplicator) { public void setRequestReplicator(final RequestReplicator requestReplicator) {
this.requestReplicator = requestReplicator; this.requestReplicator = requestReplicator;
} }
public void setProperties(NiFiProperties properties) { public void setProperties(final NiFiProperties properties) {
this.properties = properties; this.properties = properties;
} }
public void setServiceFacade(NiFiServiceFacade serviceFacade) { public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade; this.serviceFacade = serviceFacade;
} }
public void setAuditService(AuditService auditService) { public void setAuditService(final AuditService auditService) {
this.auditService = auditService; this.auditService = auditService;
} }
public void setControllerServiceLookup(ControllerServiceLookup controllerServiceLookup) { public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) {
this.controllerServiceLookup = controllerServiceLookup; this.controllerServiceProvider = controllerServiceProvider;
} }
public void setReportingTaskProvider(ReportingTaskProvider reportingTaskProvider) { public void setReportingTaskProvider(final ReportingTaskProvider reportingTaskProvider) {
this.reportingTaskProvider = reportingTaskProvider; this.reportingTaskProvider = reportingTaskProvider;
} }
public void setAuthorizer(Authorizer authorizer) { public void setAuthorizer(final Authorizer authorizer) {
this.authorizer = authorizer; this.authorizer = authorizer;
} }
} }

View File

@ -169,7 +169,7 @@ public final class DtoFactory {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final static Comparator<Class> CLASS_NAME_COMPARATOR = new Comparator<Class>() { private final static Comparator<Class> CLASS_NAME_COMPARATOR = new Comparator<Class>() {
@Override @Override
public int compare(Class class1, Class class2) { public int compare(final Class class1, final Class class2) {
return Collator.getInstance(Locale.US).compare(class1.getSimpleName(), class2.getSimpleName()); return Collator.getInstance(Locale.US).compare(class1.getSimpleName(), class2.getSimpleName());
} }
}; };
@ -297,8 +297,8 @@ public final class DtoFactory {
historyDto.setLastRefreshed(history.getLastRefreshed()); historyDto.setLastRefreshed(history.getLastRefreshed());
if (history.getActions() != null) { if (history.getActions() != null) {
List<ActionDTO> actionDtos = new ArrayList<>(); final List<ActionDTO> actionDtos = new ArrayList<>();
for (Action action : history.getActions()) { for (final Action action : history.getActions()) {
actionDtos.add(createActionDto(action)); actionDtos.add(createActionDto(action));
} }
historyDto.setActions(actionDtos); historyDto.setActions(actionDtos);
@ -998,7 +998,7 @@ public final class DtoFactory {
* @param originalSnippet snippet * @param originalSnippet snippet
* @return dto * @return dto
*/ */
public FlowSnippetDTO copySnippetContents(FlowSnippetDTO originalSnippet) { public FlowSnippetDTO copySnippetContents(final FlowSnippetDTO originalSnippet) {
final FlowSnippetDTO copySnippet = new FlowSnippetDTO(); final FlowSnippetDTO copySnippet = new FlowSnippetDTO();
if (originalSnippet.getConnections() != null) { if (originalSnippet.getConnections() != null) {
@ -1113,7 +1113,7 @@ public final class DtoFactory {
// sort a copy of the properties // sort a copy of the properties
final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() { final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
@Override @Override
public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) {
return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName());
} }
}); });
@ -1124,7 +1124,7 @@ public final class DtoFactory {
final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>(); final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>();
final List<PropertyDescriptor> descriptors = reportingTask.getPropertyDescriptors(); final List<PropertyDescriptor> descriptors = reportingTask.getPropertyDescriptors();
if (descriptors != null && !descriptors.isEmpty()) { if (descriptors != null && !descriptors.isEmpty()) {
for (PropertyDescriptor descriptor : descriptors) { for (final PropertyDescriptor descriptor : descriptors) {
orderedProperties.put(descriptor, null); orderedProperties.put(descriptor, null);
} }
} }
@ -1137,7 +1137,7 @@ public final class DtoFactory {
final PropertyDescriptor descriptor = entry.getKey(); final PropertyDescriptor descriptor = entry.getKey();
// store the property descriptor // store the property descriptor
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, "root")); dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, null));
// determine the property value - don't include sensitive properties // determine the property value - don't include sensitive properties
String propertyValue = entry.getValue(); String propertyValue = entry.getValue();
@ -1177,7 +1177,7 @@ public final class DtoFactory {
// sort a copy of the properties // sort a copy of the properties
final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() { final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
@Override @Override
public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) {
return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName());
} }
}); });
@ -1188,7 +1188,7 @@ public final class DtoFactory {
final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>(); final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>();
final List<PropertyDescriptor> descriptors = controllerService.getPropertyDescriptors(); final List<PropertyDescriptor> descriptors = controllerService.getPropertyDescriptors();
if (descriptors != null && !descriptors.isEmpty()) { if (descriptors != null && !descriptors.isEmpty()) {
for (PropertyDescriptor descriptor : descriptors) { for (final PropertyDescriptor descriptor : descriptors) {
orderedProperties.put(descriptor, null); orderedProperties.put(descriptor, null);
} }
} }
@ -1201,7 +1201,8 @@ public final class DtoFactory {
final PropertyDescriptor descriptor = entry.getKey(); final PropertyDescriptor descriptor = entry.getKey();
// store the property descriptor // store the property descriptor
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, controllerServiceNode.getProcessGroup().getIdentifier())); final String groupId = controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier();
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, groupId));
// determine the property value - don't include sensitive properties // determine the property value - don't include sensitive properties
String propertyValue = entry.getValue(); String propertyValue = entry.getValue();
@ -1254,7 +1255,7 @@ public final class DtoFactory {
propertyDescriptors = node.getControllerServiceImplementation().getPropertyDescriptors(); propertyDescriptors = node.getControllerServiceImplementation().getPropertyDescriptors();
validationErrors = node.getValidationErrors(); validationErrors = node.getValidationErrors();
processGroupId = node.getProcessGroup().getIdentifier(); processGroupId = node.getProcessGroup() == null ? null : node.getProcessGroup().getIdentifier();
} else if (component instanceof ReportingTaskNode) { } else if (component instanceof ReportingTaskNode) {
final ReportingTaskNode node = ((ReportingTaskNode) component); final ReportingTaskNode node = ((ReportingTaskNode) component);
dto.setState(node.getScheduledState().name()); dto.setState(node.getScheduledState().name());
@ -1264,20 +1265,20 @@ public final class DtoFactory {
propertyDescriptors = node.getReportingTask().getPropertyDescriptors(); propertyDescriptors = node.getReportingTask().getPropertyDescriptors();
validationErrors = node.getValidationErrors(); validationErrors = node.getValidationErrors();
processGroupId = "root"; processGroupId = null;
} }
if (propertyDescriptors != null && !propertyDescriptors.isEmpty()) { if (propertyDescriptors != null && !propertyDescriptors.isEmpty()) {
final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() { final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
@Override @Override
public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) {
return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName());
} }
}); });
sortedProperties.putAll(component.getProperties()); sortedProperties.putAll(component.getProperties());
final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>(); final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>();
for (PropertyDescriptor descriptor : propertyDescriptors) { for (final PropertyDescriptor descriptor : propertyDescriptors) {
orderedProperties.put(descriptor, null); orderedProperties.put(descriptor, null);
} }
orderedProperties.putAll(sortedProperties); orderedProperties.putAll(sortedProperties);
@ -1717,7 +1718,7 @@ public final class DtoFactory {
dto.setComments(group.getComments()); dto.setComments(group.getComments());
dto.setName(group.getName()); dto.setName(group.getName());
ProcessGroup parentGroup = group.getParent(); final ProcessGroup parentGroup = group.getParent();
if (parentGroup != null) { if (parentGroup != null) {
dto.setParentGroupId(parentGroup.getIdentifier()); dto.setParentGroupId(parentGroup.getIdentifier());
} }
@ -1886,7 +1887,7 @@ public final class DtoFactory {
// sort the relationships // sort the relationships
Collections.sort(relationships, new Comparator<RelationshipDTO>() { Collections.sort(relationships, new Comparator<RelationshipDTO>() {
@Override @Override
public int compare(RelationshipDTO r1, RelationshipDTO r2) { public int compare(final RelationshipDTO r1, final RelationshipDTO r2) {
return Collator.getInstance(Locale.US).compare(r1.getName(), r2.getName()); return Collator.getInstance(Locale.US).compare(r1.getName(), r2.getName());
} }
}); });
@ -1923,7 +1924,7 @@ public final class DtoFactory {
// sort the bulletins // sort the bulletins
Collections.sort(bulletins, new Comparator<BulletinDTO>() { Collections.sort(bulletins, new Comparator<BulletinDTO>() {
@Override @Override
public int compare(BulletinDTO bulletin1, BulletinDTO bulletin2) { public int compare(final BulletinDTO bulletin1, final BulletinDTO bulletin2) {
if (bulletin1 == null && bulletin2 == null) { if (bulletin1 == null && bulletin2 == null) {
return 0; return 0;
} else if (bulletin1 == null) { } else if (bulletin1 == null) {
@ -2260,7 +2261,7 @@ public final class DtoFactory {
// sort a copy of the properties // sort a copy of the properties
final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() { final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
@Override @Override
public int compare(PropertyDescriptor o1, PropertyDescriptor o2) { public int compare(final PropertyDescriptor o1, final PropertyDescriptor o2) {
return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName()); return Collator.getInstance(Locale.US).compare(o1.getName(), o2.getName());
} }
}); });
@ -2271,7 +2272,7 @@ public final class DtoFactory {
final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>(); final Map<PropertyDescriptor, String> orderedProperties = new LinkedHashMap<>();
final List<PropertyDescriptor> descriptors = processor.getPropertyDescriptors(); final List<PropertyDescriptor> descriptors = processor.getPropertyDescriptors();
if (descriptors != null && !descriptors.isEmpty()) { if (descriptors != null && !descriptors.isEmpty()) {
for (PropertyDescriptor descriptor : descriptors) { for (final PropertyDescriptor descriptor : descriptors) {
orderedProperties.put(descriptor, null); orderedProperties.put(descriptor, null);
} }
} }
@ -2798,7 +2799,7 @@ public final class DtoFactory {
* @param lastMod mod * @param lastMod mod
* @return dto * @return dto
*/ */
public RevisionDTO createRevisionDTO(FlowModification lastMod) { public RevisionDTO createRevisionDTO(final FlowModification lastMod) {
final Revision revision = lastMod.getRevision(); final Revision revision = lastMod.getRevision();
// create the dto // create the dto
@ -2817,7 +2818,7 @@ public final class DtoFactory {
return dto; return dto;
} }
public NodeDTO createNodeDTO(NodeIdentifier nodeId, NodeConnectionStatus status, NodeHeartbeat nodeHeartbeat, List<NodeEvent> events, boolean primary) { public NodeDTO createNodeDTO(final NodeIdentifier nodeId, final NodeConnectionStatus status, final NodeHeartbeat nodeHeartbeat, final List<NodeEvent> events, final boolean primary) {
final NodeDTO nodeDto = new NodeDTO(); final NodeDTO nodeDto = new NodeDTO();
// populate node dto // populate node dto
@ -2844,7 +2845,7 @@ public final class DtoFactory {
final List<NodeEvent> nodeEvents = new ArrayList<>(events); final List<NodeEvent> nodeEvents = new ArrayList<>(events);
Collections.sort(nodeEvents, new Comparator<NodeEvent>() { Collections.sort(nodeEvents, new Comparator<NodeEvent>() {
@Override @Override
public int compare(NodeEvent event1, NodeEvent event2) { public int compare(final NodeEvent event1, final NodeEvent event2) {
return new Date(event2.getTimestamp()).compareTo(new Date(event1.getTimestamp())); return new Date(event2.getTimestamp()).compareTo(new Date(event1.getTimestamp()));
} }
}); });
@ -2868,15 +2869,15 @@ public final class DtoFactory {
/* setters */ /* setters */
public void setControllerServiceProvider(ControllerServiceProvider controllerServiceProvider) { public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) {
this.controllerServiceProvider = controllerServiceProvider; this.controllerServiceProvider = controllerServiceProvider;
} }
public void setAuthorizer(Authorizer authorizer) { public void setAuthorizer(final Authorizer authorizer) {
this.authorizer = authorizer; this.authorizer = authorizer;
} }
public void setEntityFactory(EntityFactory entityFactory) { public void setEntityFactory(final EntityFactory entityFactory) {
this.entityFactory = entityFactory; this.entityFactory = entityFactory;
} }

View File

@ -51,11 +51,12 @@ public interface ControllerServiceDAO {
ControllerServiceNode getControllerService(String controllerServiceId); ControllerServiceNode getControllerService(String controllerServiceId);
/** /**
* Gets all of the controller services. * Gets all of the controller services for the group with the given ID or all
* controller-level services if the group id is null
* *
* @return The controller services * @return The controller services
*/ */
Set<ControllerServiceNode> getControllerServices(); Set<ControllerServiceNode> getControllerServices(String groupId);
/** /**
* Updates the specified controller service. * Updates the specified controller service.

View File

@ -16,31 +16,34 @@
*/ */
package org.apache.nifi.web.dao.impl; package org.apache.nifi.web.dao.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.dao.ComponentStateDAO; import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ControllerServiceDAO; import org.apache.nifi.web.dao.ControllerServiceDAO;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO { public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO {
private ControllerServiceProvider serviceProvider; private ControllerServiceProvider serviceProvider;
private ComponentStateDAO componentStateDAO; private ComponentStateDAO componentStateDAO;
private FlowController flowController;
private ControllerServiceNode locateControllerService(final String controllerServiceId) { private ControllerServiceNode locateControllerService(final String controllerServiceId) {
// get the controller service // get the controller service
@ -71,6 +74,24 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
// perform the update // perform the update
configureControllerService(controllerService, controllerServiceDTO); configureControllerService(controllerService, controllerServiceDTO);
final String groupId = controllerServiceDTO.getParentGroupId();
if (groupId == null) {
flowController.addRootControllerService(controllerService);
} else {
final ProcessGroup group;
if (groupId.equals(FlowController.ROOT_GROUP_ID_ALIAS)) {
group = flowController.getGroup(flowController.getRootGroupId());
} else {
group = flowController.getGroup(flowController.getRootGroupId()).findProcessGroup(groupId);
}
if (group == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
group.addControllerService(controllerService);
}
return controllerService; return controllerService;
} catch (final ControllerServiceInstantiationException csie) { } catch (final ControllerServiceInstantiationException csie) {
throw new NiFiCoreException(csie.getMessage(), csie); throw new NiFiCoreException(csie.getMessage(), csie);
@ -88,8 +109,17 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
} }
@Override @Override
public Set<ControllerServiceNode> getControllerServices() { public Set<ControllerServiceNode> getControllerServices(final String groupId) {
return serviceProvider.getAllControllerServices(); if (groupId == null) {
return flowController.getRootControllerServices();
} else {
final ProcessGroup procGroup = flowController.getGroup(flowController.getRootGroupId()).findProcessGroup(groupId);
if (procGroup == null) {
throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
}
return procGroup.getControllerServices(true);
}
} }
@Override @Override
@ -162,7 +192,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
} }
@Override @Override
public void verifyUpdateReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) { public void verifyUpdateReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
if (controllerServiceState != null) { if (controllerServiceState != null) {
@ -200,7 +230,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
controllerService.verifyCanDisable(); controllerService.verifyCanDisable();
} }
} }
} catch (IllegalArgumentException iae) { } catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException("Controller Service state: Value must be one of [ENABLED, DISABLED]"); throw new IllegalArgumentException("Controller Service state: Value must be one of [ENABLED, DISABLED]");
} }
} }
@ -255,35 +285,39 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
} }
@Override @Override
public void deleteControllerService(String controllerServiceId) { public void deleteControllerService(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
serviceProvider.removeControllerService(controllerService); serviceProvider.removeControllerService(controllerService);
} }
@Override @Override
public StateMap getState(String controllerServiceId, Scope scope) { public StateMap getState(final String controllerServiceId, final Scope scope) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
return componentStateDAO.getState(controllerService, scope); return componentStateDAO.getState(controllerService, scope);
} }
@Override @Override
public void verifyClearState(String controllerServiceId) { public void verifyClearState(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
controllerService.verifyCanClearState(); controllerService.verifyCanClearState();
} }
@Override @Override
public void clearState(String controllerServiceId) { public void clearState(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
componentStateDAO.clearState(controllerService); componentStateDAO.clearState(controllerService);
} }
/* setters */ /* setters */
public void setServiceProvider(ControllerServiceProvider serviceProvider) { public void setServiceProvider(final ControllerServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider; this.serviceProvider = serviceProvider;
} }
public void setComponentStateDAO(ComponentStateDAO componentStateDAO) { public void setComponentStateDAO(final ComponentStateDAO componentStateDAO) {
this.componentStateDAO = componentStateDAO; this.componentStateDAO = componentStateDAO;
} }
public void setFlowController(final FlowController flowController) {
this.flowController = flowController;
}
} }

View File

@ -91,6 +91,7 @@
<bean id="controllerServiceDAO" class="org.apache.nifi.web.dao.impl.StandardControllerServiceDAO"> <bean id="controllerServiceDAO" class="org.apache.nifi.web.dao.impl.StandardControllerServiceDAO">
<property name="serviceProvider" ref="controllerServiceProvider"/> <property name="serviceProvider" ref="controllerServiceProvider"/>
<property name="componentStateDAO" ref="componentStateDAO"/> <property name="componentStateDAO" ref="componentStateDAO"/>
<property name="flowController" ref="flowController" />
</bean> </bean>
<bean id="reportingTaskDAO" class="org.apache.nifi.web.dao.impl.StandardReportingTaskDAO"> <bean id="reportingTaskDAO" class="org.apache.nifi.web.dao.impl.StandardReportingTaskDAO">
<property name="reportingTaskProvider" ref="reportingTaskProvider"/> <property name="reportingTaskProvider" ref="reportingTaskProvider"/>
@ -150,7 +151,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator" /> <property name="clusterCoordinator" ref="clusterCoordinator" />
<property name="requestReplicator" ref="requestReplicator" /> <property name="requestReplicator" ref="requestReplicator" />
<property name="auditService" ref="auditService"/> <property name="auditService" ref="auditService"/>
<property name="controllerServiceLookup" ref="controllerServiceProvider"/> <property name="controllerServiceProvider" ref="controllerServiceProvider"/>
<property name="reportingTaskProvider" ref="reportingTaskProvider"/> <property name="reportingTaskProvider" ref="reportingTaskProvider"/>
<property name="authorizer" ref="authorizer"/> <property name="authorizer" ref="authorizer"/>
</bean> </bean>