NIFI-1800: Tie Controller Services to Process Groups. This closes #431

This commit is contained in:
Mark Payne 2016-05-06 21:24:50 -04:00 committed by Matt Gilman
parent b7aa381ab4
commit 25e7f314b1
60 changed files with 1197 additions and 224 deletions

View File

@ -138,7 +138,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
// if the property descriptor identifies a Controller Service, validate that the ControllerService exists, is of the correct type, and is valid
if (controllerServiceDefinition != null) {
final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition);
final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition, context.getProcessGroupIdentifier());
if (validIdentifiers != null && validIdentifiers.contains(input)) {
final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input);
if (!context.isValidationRequired(controllerService)) {

View File

@ -92,4 +92,11 @@ public interface ValidationContext {
* support the Expression Language or is not a valid property name
*/
boolean isExpressionLanguageSupported(String propertyName);
/**
* Returns the identifier of the ProcessGroup that the component being validated lives in
*
* @return the identifier of the ProcessGroup that the component being validated lives in
*/
String getProcessGroupIdentifier();
}

View File

@ -54,12 +54,15 @@ public interface ControllerServiceLookup {
boolean isControllerServiceEnabled(ControllerService service);
/**
*
* @param serviceType type of service to get identifiers for
* @param groupId the ID of the Process Group to look in for Controller Services
*
* @return the set of all Controller Service Identifiers whose Controller
* Service is of the given type.
* @throws IllegalArgumentException if the given class is not an interface
*/
Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) throws IllegalArgumentException;
Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId) throws IllegalArgumentException;
/**
* @param serviceIdentifier identifier to look up

View File

@ -102,4 +102,8 @@ public class NotificationValidationContext implements ValidationContext {
return Boolean.TRUE.equals(supported);
}
@Override
public String getProcessGroupIdentifier() {
return null;
}
}

View File

@ -82,7 +82,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
final Set<String> ids = new HashSet<>();
for (final Map.Entry<String, ControllerServiceConfiguration> entry : controllerServiceMap.entrySet()) {
if (serviceType.isAssignableFrom(entry.getValue().getService().getClass())) {

View File

@ -47,8 +47,8 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return context.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return context.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override

View File

@ -86,8 +86,8 @@ public class MockValidationContext implements ValidationContext, ControllerServi
}
@Override
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) {
return context.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId) {
return context.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override
@ -136,4 +136,9 @@ public class MockValidationContext implements ValidationContext, ControllerServi
final Boolean supported = expressionLanguageSupported.get(propertyName);
return Boolean.TRUE.equals(supported);
}
@Override
public String getProcessGroupIdentifier() {
return "unit test";
}
}

View File

@ -47,7 +47,7 @@ public class MockControllerServiceLookup implements ControllerServiceLookup {
}
@Override
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType)
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType, String groupId)
throws IllegalArgumentException {
return Collections.emptySet();
}

View File

@ -138,7 +138,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
return null;
}
final Set<NodeResponse> nodeResponses = responseMap.values().stream().map(p -> p.getResponse()).collect(Collectors.toSet());
final Set<NodeResponse> nodeResponses = responseMap.values().stream()
.map(p -> p.getResponse())
.filter(response -> response != null)
.collect(Collectors.toSet());
mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses);
logger.debug("Notifying all that merged response is complete for {}", id);

View File

@ -115,7 +115,6 @@ import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
@ -126,6 +125,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@ -406,7 +406,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final byte[] serializedServices = clusterDataFlow.getControllerServices();
if (serializedServices != null && serializedServices.length > 0) {
ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), null, encryptor, bulletinRepository, properties.getAutoResumeState());
}
// start multicast broadcasting service, if configured
@ -1271,21 +1271,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return baos.toByteArray();
}
private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
docFactory.setNamespaceAware(true);
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("controllerServices");
document.appendChild(rootElement);
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
}
return serialize(document);
}
private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@ -1303,19 +1288,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return serialize(document);
}
public void saveControllerServices() {
try {
dataFlowManagementService.updateControllerServices(serializeControllerServices());
} catch (final Exception e) {
logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
"Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
}
}
public void saveReportingTasks() {
try {
@ -2299,8 +2271,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String message) {

View File

@ -48,5 +48,4 @@ public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
public ReportingContext getReportingContext() {
return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider, stateManager);
}
}

View File

@ -256,7 +256,9 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public boolean isValid() {
final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(getProperties(), getAnnotationData()));
final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(
getProperties(), getAnnotationData(), getProcessGroupIdentifier()));
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
return false;
@ -275,7 +277,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
final List<ValidationResult> results = new ArrayList<>();
lock.lock();
try {
final ValidationContext validationContext = validationContextFactory.newValidationContext(serviceIdentifiersNotToValidate, getProperties(), getAnnotationData());
final ValidationContext validationContext = validationContextFactory.newValidationContext(
serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@ -297,6 +300,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
public abstract void verifyModifiable() throws IllegalStateException;
protected abstract String getProcessGroupIdentifier();
/**
*
*/

View File

@ -24,8 +24,8 @@ import org.apache.nifi.components.ValidationContext;
public interface ValidationContextFactory {
ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData, String groupId);
ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData);
ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData, String groupId);
}

View File

@ -21,9 +21,23 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.groups.ProcessGroup;
public interface ControllerServiceNode extends ConfiguredComponent {
/**
* @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service
* does not belong to any Process Group
*/
ProcessGroup getProcessGroup();
/**
* Sets the Process Group for this Controller Service
*
* @param group the group that the service belongs to
*/
void setProcessGroup(ProcessGroup group);
/**
* <p>
* Returns a proxy implementation of the Controller Service that this ControllerServiceNode

View File

@ -19,6 +19,7 @@ package org.apache.nifi.events;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;
@ -45,7 +46,9 @@ public final class BulletinFactory {
break;
}
return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
final ProcessGroup group = connectable.getProcessGroup();
final String groupId = group == null ? null : group.getIdentifier();
return BulletinFactory.createBulletin(groupId, connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
}
public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName,

View File

@ -29,6 +29,7 @@ import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Processor;
@ -433,6 +434,19 @@ public interface ProcessGroup extends Authorizable {
*/
Funnel findFunnel(String id);
/**
* @param id of the Controller Service
* @return the Controller Service with the given ID, if it exists as a child or
* descendant of this ProcessGroup. This performs a recursive search of all
* descendant ProcessGroups
*/
ControllerServiceNode findControllerService(String id);
/**
* @return a List of all Controller Services contained within this ProcessGroup and any child Process Groups
*/
Set<ControllerServiceNode> findAllControllerServices();
/**
* Adds the given RemoteProcessGroup to this ProcessGroup
*
@ -645,6 +659,36 @@ public interface ProcessGroup extends Authorizable {
*/
void removeFunnel(Funnel funnel);
/**
* Adds the given Controller Service to this group
*
* @param service the service to add
*/
void addControllerService(ControllerServiceNode service);
/**
* Returns the controller service with the given id
*
* @param id the id of the controller service
* @return the controller service with the given id, or <code>null</code> if no service exists with that id
*/
ControllerServiceNode getControllerService(String id);
/**
* Returns a Set of all Controller Services that are available in this Process Group
*
* @param recursive if <code>true</code>, returns the Controller Services available to the parent Process Group, its parents, etc.
* @return a Set of all Controller Services that are available in this Process Group
*/
Set<ControllerServiceNode> getControllerServices(boolean recursive);
/**
* Removes the given Controller Service from this group
*
* @param service the service to remove
*/
void removeControllerService(ControllerServiceNode service);
/**
* @return <code>true</code> if this ProcessGroup has no Processors, Labels,
* Connections, ProcessGroups, RemoteProcessGroupReferences, or Ports.

View File

@ -126,6 +126,10 @@ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
@ -262,7 +266,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final RemoteSiteListener externalSiteListener;
private final AtomicReference<CounterRepository> counterRepositoryRef;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ControllerServiceProvider controllerServiceProvider;
private final StandardControllerServiceProvider controllerServiceProvider;
private final KeyService keyService;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
@ -436,7 +440,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
@ -487,6 +490,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
instanceId = UUID.randomUUID().toString();
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
controllerServiceProvider.setRootProcessGroup(rootGroup);
if (remoteInputSocketPort == null) {
LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set");
externalSiteListener = null;
@ -1426,6 +1432,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
externalSiteListener.setRootGroup(group);
}
controllerServiceProvider.setRootProcessGroup(rootGroup);
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
} finally {
@ -3836,8 +3844,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override

View File

@ -68,6 +68,8 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;

View File

@ -55,6 +55,12 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
@ -123,19 +129,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Element rootElement = document.getDocumentElement();
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
return isEmpty(rootGroupDto);
}
@Override
public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
// get the controller's root group
final ProcessGroup rootGroup = controller.getGroup(controller.getRootGroupId());
// handle corner cases involving no proposed flow
if (proposedFlow == null) {
if (rootGroup.isEmpty()) {
if (controller.getGroup(controller.getRootGroupId()).isEmpty()) {
return; // no sync to perform
} else {
throw new UninheritableFlowException("Proposed configuration is empty, but the controller contains a data flow.");
@ -160,6 +165,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
} else {
final Document document = parseFlowBytes(existingFlow);
final Element rootElement = document.getDocumentElement();
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement);
logger.trace("Setting controller thread counts");
final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
@ -180,17 +186,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
final List<Element> controllerServiceElements;
final List<Element> unrootedControllerServiceElements;
if (controllerServicesElement == null) {
controllerServiceElements = Collections.emptyList();
unrootedControllerServiceElements = Collections.emptyList();
} else {
controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
}
logger.trace("Parsing process group from DOM");
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
existingFlowEmpty = taskElements.isEmpty() && controllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
}
}
@ -237,6 +243,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
synchronized (configuration) {
// get the root element
final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement);
// set controller config
logger.trace("Updating flow config");
@ -252,12 +259,27 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// get the root group XML element
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
// if this controller isn't initialized or its empty, add the root group, otherwise update
final ProcessGroup rootGroup;
if (!initialized || existingFlowEmpty) {
logger.trace("Adding root process group");
rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
} else {
logger.trace("Updating root process group");
rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
}
final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
if (controllerServicesElement != null) {
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
if (!initialized || existingFlowEmpty) {
ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState);
// If the encoding version is null, we are loading a flow from NiFi 0.x, where Controller
// Services could not be scoped by Process Group. As a result, we want to move the Process Groups
// to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level
// Controller Service.
final ProcessGroup group = (encodingVersion == null) ? rootGroup : null;
ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor, controller.getBulletinRepository(), autoResumeState);
} else {
for (final Element serviceElement : serviceElements) {
updateControllerService(controller, serviceElement, encryptor);
@ -265,15 +287,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
// if this controller isn't initialized or its emtpy, add the root group, otherwise update
if (!initialized || existingFlowEmpty) {
logger.trace("Adding root process group");
addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
} else {
logger.trace("Updating root process group");
updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
}
final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks");
if (reportingTasksElement != null) {
final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
@ -488,14 +501,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor)
throws ProcessorInstantiationException {
private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
// get the process group
final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor);
final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
// update the process group
if (parentId == null) {
@ -636,7 +649,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// update nested process groups (recursively)
final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor);
updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
}
// update connections
@ -692,6 +705,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
// Update Controller Services
final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
for (final Element serviceNodeElement : serviceNodeList) {
updateControllerService(controller, serviceNodeElement, encryptor);
}
return processGroup;
}
@ -749,13 +768,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor)
throws ProcessorInstantiationException {
private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
// add the process group
final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor);
final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId());
processGroup.setComments(processGroupDTO.getComments());
processGroup.setPosition(toPosition(processGroupDTO.getPosition()));
@ -892,7 +911,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// add nested process groups (recursively)
final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor);
addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
}
// add remote process group
@ -1027,6 +1046,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
processGroup.addConnection(connection);
}
// Add Controller Services
final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
if (!serviceNodeList.isEmpty()) {
ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState);
}
return processGroup;
}

View File

@ -918,8 +918,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public boolean isValid() {
try {
final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(),
getAnnotationData());
final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@ -966,7 +965,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final List<ValidationResult> results = new ArrayList<>();
try {
final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(),
getAnnotationData());
getAnnotationData(), getProcessGroup().getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@ -1421,4 +1420,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
callback.postMonitor();
}
}
@Override
protected String getProcessGroupIdentifier() {
final ProcessGroup group = getProcessGroup();
return group == null ? null : group.getIdentifier();
}
}

View File

@ -254,4 +254,9 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public String toString() {
return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
}
@Override
protected String getProcessGroupIdentifier() {
return null;
}
}

View File

@ -115,8 +115,8 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return serviceProvider.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override

View File

@ -69,13 +69,14 @@ public class StandardReportingInitializationContext implements ReportingInitiali
return schedulingPeriod;
}
@Override
public SchedulingStrategy getSchedulingStrategy() {
return schedulingStrategy;
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return serviceProvider.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.serialization;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.w3c.dom.Element;
/**
* Provides a mechanism for interpreting the version of the encoding scheme that was used to serialize
* a NiFi Flow. The versioning scheme is made up of a major version and a minor version, both being
* positive integers.
*/
public class FlowEncodingVersion {
public static final String ENCODING_VERSION_ATTRIBUTE = "encoding-version";
private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d)+");
private final int majorVersion;
private final int minorVersion;
public FlowEncodingVersion(final int majorVersion, final int minorVersion) {
if (majorVersion < 0) {
throw new IllegalArgumentException("Invalid version: Major version cannot be less than 0 but was " + majorVersion);
}
if (minorVersion < 0) {
throw new IllegalArgumentException("Invalid version: Minor version cannot be less than 0 but was " + minorVersion);
}
this.majorVersion = majorVersion;
this.minorVersion = minorVersion;
}
/**
* Parses the 'encoding-version' attribute of the given XML Element as FlowEncodingVersion.
* The attribute value is expected to be in the format &lt;major version&gt;.&lt;minor version&lt;
*
* @param xmlElement the XML Element that contains an 'encoding-version' attribute
* @return a FlowEncodingVersion that has the major and minor versions specified in the String, or <code>null</code> if the input is null or the input
* does not have an 'encoding-version' attribute
*
* @throws IllegalArgumentException if the value is not in the format &lt;major version&gt;.&lt;minor version&gt;, if either major version or minor
* version is not an integer, or if either the major or minor version is less than 0.
*/
public static FlowEncodingVersion parse(final Element xmlElement) {
if (xmlElement == null) {
return null;
}
final String version = xmlElement.getAttribute(ENCODING_VERSION_ATTRIBUTE);
if (version == null) {
return null;
}
return parse(version);
}
/**
* Parses the given String as FlowEncodingVersion. The String is expected to be in the format &lt;major version&gt;.&lt;minor version&lt;
*
* @param version the String representation of the encoding version
* @return a FlowEncodingVersion that has the major and minor versions specified in the String, or <code>null</code> if the input is null
*
* @throws IllegalArgumentException if the value is not in the format &lt;major version&gt;.&lt;minor version&gt;, if either major version or minor
* version is not an integer, or if either the major or minor version is less than 0.
*/
public static FlowEncodingVersion parse(final String version) {
if (version == null || version.trim().isEmpty()) {
return null;
}
final Matcher matcher = VERSION_PATTERN.matcher(version.trim());
if (!matcher.matches()) {
throw new IllegalArgumentException(version + " is not a valid version for Flow serialization. Should be in format <number>.<number>");
}
final int majorVersion = Integer.parseInt(matcher.group(1));
final int minorVersion = Integer.parseInt(matcher.group(2));
return new FlowEncodingVersion(majorVersion, minorVersion);
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
package org.apache.nifi.controller.serialization;
import java.util.ArrayList;
import java.util.HashMap;
@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@ -114,7 +115,7 @@ public class FlowFromDOMFactory {
return dto;
}
public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) {
public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) {
final ProcessGroupDTO dto = new ProcessGroupDTO();
final String groupId = getString(element, "id");
dto.setId(groupId);
@ -169,7 +170,7 @@ public class FlowFromDOMFactory {
nodeList = DomUtils.getChildNodesByTagName(element, "processGroup");
for (int i = 0; i < nodeList.getLength(); i++) {
processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor));
processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor, encodingVersion));
}
nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup");

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
package org.apache.nifi.controller.serialization;
/**
* Represents the exceptional case when flow configuration is malformed and therefore, cannot be serialized or deserialized.

View File

@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
package org.apache.nifi.controller.serialization;
import java.io.OutputStream;
import org.apache.nifi.controller.FlowController;
/**
* Serializes the flow configuration of a controller instance to an output stream.
*

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
package org.apache.nifi.controller.serialization;
import org.apache.nifi.cluster.ConnectionException;

View File

@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
package org.apache.nifi.controller.serialization;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.encrypt.StringEncryptor;
/**

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
package org.apache.nifi.controller.serialization;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
@ -39,6 +39,9 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
@ -59,6 +62,7 @@ import org.w3c.dom.Element;
* NOT THREAD-SAFE.
*/
public class StandardFlowSerializer implements FlowSerializer {
private static final String MAX_ENCODING_VERSION = "1.0";
private final StringEncryptor encryptor;
@ -78,6 +82,7 @@ public class StandardFlowSerializer implements FlowSerializer {
// populate document with controller state
final Element rootNode = doc.createElement("flowController");
rootNode.setAttribute("encoding-version", MAX_ENCODING_VERSION);
doc.appendChild(rootNode);
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
@ -85,9 +90,6 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode);
for (final ControllerServiceNode serviceNode : controller.getAllControllerServices()) {
addControllerService(controllerServicesNode, serviceNode, encryptor);
}
final Element reportingTasksNode = doc.createElement("reportingTasks");
rootNode.appendChild(reportingTasksNode);
@ -180,6 +182,10 @@ public class StandardFlowSerializer implements FlowSerializer {
for (final Connection connection : group.getConnections()) {
addConnection(element, connection);
}
for (final ControllerServiceNode service : group.getControllerServices(false)) {
addControllerService(element, service);
}
}
private void addStyle(final Element parentElement, final Map<String, String> style) {
@ -408,7 +414,7 @@ public class StandardFlowSerializer implements FlowSerializer {
parentElement.appendChild(element);
}
public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
public void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
addTextElement(serviceElement, "id", serviceNode.getIdentifier());
addTextElement(serviceElement, "name", serviceNode.getName());

View File

@ -31,8 +31,9 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.nifi.controller.FlowFromDOMFactory;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@ -47,12 +48,8 @@ public class ControllerServiceLoader {
private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
public static List<ControllerServiceNode> loadControllerServices(
final ControllerServiceProvider provider,
final InputStream serializedStream,
final StringEncryptor encryptor,
final BulletinRepository bulletinRepo,
final boolean autoResumeState) throws IOException {
public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final ProcessGroup parentGroup,
final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException {
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
documentBuilderFactory.setNamespaceAware(true);
@ -93,21 +90,22 @@ public class ControllerServiceLoader {
final Document document = builder.parse(in);
final Element controllerServices = document.getDocumentElement();
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
return new ArrayList<>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
return new ArrayList<>(loadControllerServices(serviceElements, provider, parentGroup, encryptor, bulletinRepo, autoResumeState));
} catch (SAXException | ParserConfigurationException sxe) {
throw new IOException(sxe);
}
}
public static Collection<ControllerServiceNode> loadControllerServices(
final List<Element> serviceElements,
final ControllerServiceProvider provider,
final StringEncryptor encryptor,
final BulletinRepository bulletinRepo,
final boolean autoResumeState) {
public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final ProcessGroup parentGroup,
final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
for (final Element serviceElement : serviceElements) {
final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor);
if (parentGroup != null) {
parentGroup.addControllerService(serviceNode);
}
// We need to clone the node because it will be used in a separate thread below, and
// Element is not thread-safe.
nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true));

View File

@ -49,8 +49,8 @@ public class StandardControllerServiceInitializationContext implements Controlle
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return serviceProvider.getControllerServiceIdentifiers(serviceType);
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override

View File

@ -39,6 +39,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
@ -62,6 +63,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
private String comment;
private ProcessGroup processGroup;
private final AtomicBoolean active;
@ -84,6 +86,26 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
return implementation;
}
@Override
public ProcessGroup getProcessGroup() {
readLock.lock();
try {
return processGroup;
} finally {
readLock.unlock();
}
}
@Override
public void setProcessGroup(final ProcessGroup group) {
writeLock.lock();
try {
this.processGroup = group;
} finally {
writeLock.unlock();
}
}
@Override
public ControllerServiceReference getReferences() {
readLock.lock();
@ -367,4 +389,10 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}
}
@Override
protected String getProcessGroupIdentifier() {
final ProcessGroup procGroup = getProcessGroup();
return procGroup == null ? null : procGroup.getIdentifier();
}
}

View File

@ -30,19 +30,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
@ -51,6 +48,7 @@ import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
@ -68,10 +66,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
private final ProcessScheduler processScheduler;
private final ConcurrentMap<String, ControllerServiceNode> controllerServices;
private static final Set<Method> validDisabledMethods;
private final BulletinRepository bulletinRepo;
private final StateManagerProvider stateManagerProvider;
private volatile ProcessGroup rootGroup;
static {
// methods that are okay to be called when the service is disabled.
@ -88,12 +86,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider) {
// the following 2 maps must be updated atomically, but we do not lock around them because they are modified
// only in the createControllerService method, and both are modified before the method returns
this.controllerServices = new ConcurrentHashMap<>();
this.processScheduler = scheduler;
this.bulletinRepo = bulletinRepo;
this.stateManagerProvider = stateManagerProvider;
}
public void setRootProcessGroup(ProcessGroup rootGroup) {
this.rootGroup = rootGroup;
}
private Class<?>[] getInterfaces(final Class<?> cls) {
final List<Class<?>> allIfcs = new ArrayList<>();
populateInterfaces(cls, allIfcs);
@ -195,7 +196,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
this.controllerServices.put(id, serviceNode);
return serviceNode;
} catch (final Throwable t) {
throw new ControllerServiceInstantiationException(t);
@ -444,7 +444,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? null : node.getProxiedControllerService();
}
@ -455,27 +455,43 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLED == node.getState();
}
@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLING == node.getState();
}
@Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
return controllerServices.get(serviceIdentifier);
final ProcessGroup group = rootGroup;
return group == null ? null : group.findControllerService(serviceIdentifier);
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
ProcessGroup group = rootGroup;
if (group == null) {
return Collections.emptySet();
}
if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
group = group.findProcessGroup(groupId);
}
if (group == null) {
return Collections.emptySet();
}
final Set<ControllerServiceNode> serviceNodes = group.getControllerServices(true);
final Set<String> identifiers = new HashSet<>();
for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
identifiers.add(entry.getKey());
for (final ControllerServiceNode serviceNode : serviceNodes) {
if (requireNonNull(serviceType).isAssignableFrom(serviceNode.getProxiedControllerService().getClass())) {
identifiers.add(serviceNode.getIdentifier());
}
}
@ -490,39 +506,22 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
if (existing == null || existing != serviceNode) {
throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup();
if (group == null) {
throw new IllegalArgumentException("Cannot remote Controller Service " + serviceNode + " because it does not belong to any Process Group");
}
serviceNode.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this, null);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
}
for (final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null) {
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
if (value != null) {
final ControllerServiceNode referencedNode = getControllerServiceNode(value);
if (referencedNode != null) {
referencedNode.removeReference(serviceNode);
}
}
}
}
controllerServices.remove(serviceNode.getIdentifier());
stateManagerProvider.onComponentRemoved(serviceNode.getIdentifier());
group.removeControllerService(serviceNode);
}
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
return new HashSet<>(controllerServices.values());
final ProcessGroup group = rootGroup;
if (group == null) {
return Collections.emptySet();
}
return group.findAllControllerServices();
}
/**
@ -560,6 +559,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences);
return enableReferencingServices(serviceNode, recursiveReferences);
}
@ -580,6 +580,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
logger.debug("Enabling {} because it references {}", nodeToEnable, serviceNode);
enableControllerService(nodeToEnable);
updated.add(nodeToEnable);
}

View File

@ -186,7 +186,7 @@ public class StandardStateManagerProvider implements StateManagerProvider {
provider.initialize(initContext);
}
final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null);
final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null);
final Collection<ValidationResult> results = provider.validate(validationContext);
final StringBuilder validationFailures = new StringBuilder();

View File

@ -41,9 +41,9 @@ import javax.xml.validation.SchemaFactory;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowFromDOMFactory;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.DomUtils;
@ -434,6 +434,18 @@ public final class FingerprintFactory {
}
}
final Set<ControllerServiceDTO> services = snippet.getControllerServices();
if (services == null || services.isEmpty()) {
builder.append("NO_CONTROLLER_SERVICES");
} else {
final List<ControllerServiceDTO> sortedServices = new ArrayList<>(services);
Collections.sort(sortedServices, componentComparator);
for (final ControllerServiceDTO service : sortedServices) {
addControllerServiceFingerprint(builder, service);
}
}
return builder;
}

View File

@ -36,6 +36,7 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@ -45,6 +46,7 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.NarCloseable;
@ -92,6 +94,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
private final Map<String, ProcessorNode> processors = new HashMap<>();
private final Map<String, Funnel> funnels = new HashMap<>();
private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
private final StringEncryptor encryptor;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -1720,6 +1723,41 @@ public final class StandardProcessGroup implements ProcessGroup {
return null;
}
@Override
public ControllerServiceNode findControllerService(final String id) {
return findControllerService(id, this);
}
private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) {
ControllerServiceNode service = start.getControllerService(id);
if (service != null) {
return service;
}
for (final ProcessGroup group : start.getProcessGroups()) {
service = findControllerService(id, group);
if (service != null) {
return service;
}
}
return null;
}
@Override
public Set<ControllerServiceNode> findAllControllerServices() {
return findAllControllerServices(this);
}
public Set<ControllerServiceNode> findAllControllerServices(ProcessGroup start) {
final Set<ControllerServiceNode> services = start.getControllerServices(false);
for (final ProcessGroup group : start.getProcessGroups()) {
services.addAll(findAllControllerServices(group));
}
return services;
}
@Override
public void removeFunnel(final Funnel funnel) {
writeLock.lock();
@ -1759,6 +1797,91 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public void addControllerService(final ControllerServiceNode service) {
writeLock.lock();
try {
final String id = requireNonNull(service).getIdentifier();
final ControllerServiceNode existingService = controllerServices.get(id);
if (existingService != null) {
throw new IllegalStateException("A Controller Service is already registered to this ProcessGroup with ID " + id);
}
service.setProcessGroup(this);
this.controllerServices.put(service.getIdentifier(), service);
LOG.info("{} added to {}", service, this);
} finally {
writeLock.unlock();
}
}
@Override
public ControllerServiceNode getControllerService(final String id) {
readLock.lock();
try {
return controllerServices.get(requireNonNull(id));
} finally {
readLock.unlock();
}
}
@Override
public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
readLock.lock();
try {
final Set<ControllerServiceNode> services = new HashSet<>();
services.addAll(controllerServices.values());
if (recursive && parent.get() != null) {
services.addAll(parent.get().getControllerServices(true));
}
return services;
} finally {
readLock.unlock();
}
}
@Override
public void removeControllerService(final ControllerServiceNode service) {
writeLock.lock();
try {
final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier());
if (existing == null) {
throw new IllegalStateException(service + " is not a member of this Process Group");
}
service.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
}
for (final Map.Entry<PropertyDescriptor, String> entry : service.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null) {
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
if (value != null) {
final ControllerServiceNode referencedNode = getControllerService(value);
if (referencedNode != null) {
referencedNode.removeReference(service);
}
}
}
}
controllerServices.remove(service.getIdentifier());
flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
LOG.info("{} removed from {}", service, this);
} finally {
writeLock.unlock();
}
}
@Override
public void remove(final Snippet snippet) {
writeLock.lock();

View File

@ -23,9 +23,9 @@ import java.io.OutputStream;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
/**
* Interface to define service methods for FlowController configuration.

View File

@ -24,8 +24,8 @@ import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.serialization.FlowSerializationException;
public class StandardSnippetDeserializer {

View File

@ -23,8 +23,8 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.serialization.FlowSerializationException;
public final class StandardSnippetSerializer {

View File

@ -29,12 +29,12 @@ import java.util.zip.GZIPOutputStream;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.FlowSynchronizer;
import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;

View File

@ -24,7 +24,7 @@ import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.web.api.dto.TemplateDTO;
public class TemplateDeserializer {

View File

@ -23,7 +23,7 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.web.api.dto.TemplateDTO;
public final class TemplateSerializer {

View File

@ -130,11 +130,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
if (!serviceType.isInterface()) {
throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface");
}
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId);
}
@Override

View File

@ -44,20 +44,23 @@ public class StandardValidationContext implements ValidationContext {
private final Map<String, Boolean> expressionLanguageSupported;
private final String annotationData;
private final Set<String> serviceIdentifiersToNotValidate;
private final String groupId;
public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, final String annotationData) {
this(controllerServiceProvider, Collections.<String>emptySet(), properties, annotationData);
public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId) {
this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId);
}
public StandardValidationContext(
final ControllerServiceProvider controllerServiceProvider,
final Set<String> serviceIdentifiersToNotValidate,
final Map<PropertyDescriptor, String> properties,
final String annotationData) {
final String annotationData,
final String groupId) {
this.controllerServiceProvider = controllerServiceProvider;
this.properties = new HashMap<>(properties);
this.annotationData = annotationData;
this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate;
this.groupId = groupId;
preparedQueries = new HashMap<>(properties.size());
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
@ -90,7 +93,7 @@ public class StandardValidationContext implements ValidationContext {
@Override
public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) {
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(controllerService.getIdentifier());
return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData());
return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), serviceNode.getProcessGroup().getIdentifier());
}
@Override
@ -134,4 +137,9 @@ public class StandardValidationContext implements ValidationContext {
final Boolean supported = expressionLanguageSupported.get(propertyName);
return Boolean.TRUE.equals(supported);
}
@Override
public String getProcessGroupIdentifier() {
return groupId;
}
}

View File

@ -33,12 +33,13 @@ public class StandardValidationContextFactory implements ValidationContextFactor
}
@Override
public ValidationContext newValidationContext(final Map<PropertyDescriptor, String> properties, final String annotationData) {
return new StandardValidationContext(serviceProvider, properties, annotationData);
public ValidationContext newValidationContext(final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId) {
return new StandardValidationContext(serviceProvider, properties, annotationData, groupId);
}
@Override
public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate, final Map<PropertyDescriptor, String> properties, final String annotationData) {
return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData);
public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate,
final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId) {
return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId);
}
}

View File

@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.lifecycle.LifeCycle;
/**

View File

@ -29,6 +29,8 @@
<!-- Groupings of Processors/Ports -->
<xs:element name="rootGroup" type="RootProcessGroupType" />
<!-- This exists for backward compatibility between NiFi 1.x and NiFi 0.x. Any Controller Service that is listed
here is assigned to the root group -->
<xs:element name="controllerServices" type="ControllerServicesType" minOccurs="0" maxOccurs="1" />
<xs:element name="reportingTasks" type="ReportingTasksType" minOccurs="0" maxOccurs="1" />
@ -139,6 +141,7 @@
<xs:element name="processGroup" type="ProcessGroupType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="remoteProcessGroup" type="RemoteProcessGroupType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="connection" type="ConnectionType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
@ -162,6 +165,7 @@
<xs:element name="processGroup" type="ProcessGroupType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="remoteProcessGroup" type="RemoteProcessGroupType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="connection" type="ConnectionType" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>

View File

@ -28,6 +28,9 @@ import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;

View File

@ -290,7 +290,7 @@ public class TestProcessorLifecycle {
}
});
}
assertTrue(countDownCounter.await(10000, TimeUnit.MILLISECONDS));
assertTrue(countDownCounter.await(1000000, TimeUnit.MILLISECONDS));
String previousOperation = null;
for (String operationName : testProcessor.operationNames) {
if (previousOperation == null || previousOperation.equals("@OnStopped")) {
@ -559,8 +559,10 @@ public class TestProcessorLifecycle {
this.setControllerRootGroup(fc, testGroup);
ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo", true);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testGroup.addControllerService(testServiceNode);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testGroup.addProcessor(testProcNode);
testProcNode.setProperty("P", "hello");
testProcNode.setProperty("S", testServiceNode.getIdentifier());

View File

@ -50,6 +50,8 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -116,11 +118,17 @@ public class TestStandardProcessScheduler {
@Test(timeout = 6000)
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
final Processor proc = new ServiceReferencingProcessor();
final ProcessGroup group = new MockProcessGroup();
final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class));
serviceProvider.setRootProcessGroup(group);
final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class));
final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true);
group.addControllerService(service);
final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider);
group.addProcessor(procNode);
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier());

View File

@ -35,6 +35,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.groups.ProcessGroup;
@ -88,13 +89,17 @@ public class TestStandardControllerServiceProvider {
provider.disableControllerService(serviceNode);
}
@Test(timeout=10000)
@Test(timeout = 1000000)
public void testEnableDisableWithReference() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
final ProcessGroup group = new MockProcessGroup();
provider.setRootProcessGroup(group);
final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
group.addControllerService(serviceNodeA);
group.addControllerService(serviceNodeB);
serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
@ -145,6 +150,8 @@ public class TestStandardControllerServiceProvider {
public void testEnableReferencingServicesGraph(ProcessScheduler scheduler) {
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
final ProcessGroup procGroup = new MockProcessGroup();
provider.setRootProcessGroup(procGroup);
// build a graph of controller services with dependencies as such:
//
@ -163,6 +170,11 @@ public class TestStandardControllerServiceProvider {
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
procGroup.addControllerService(serviceNode3);
procGroup.addControllerService(serviceNode4);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");

View File

@ -0,0 +1,530 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service.mock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
public class MockProcessGroup implements ProcessGroup {
private Map<String, ControllerServiceNode> serviceMap = new HashMap<>();
@Override
public Authorizable getParentAuthorizable() {
return null;
}
@Override
public Resource getResource() {
return null;
}
@Override
public ProcessGroup getParent() {
return null;
}
@Override
public void setParent(ProcessGroup group) {
}
@Override
public String getIdentifier() {
return "unit test group id";
}
@Override
public String getName() {
return "unit test group";
}
@Override
public void setName(String name) {
}
@Override
public void setPosition(Position position) {
}
@Override
public Position getPosition() {
return null;
}
@Override
public String getComments() {
return null;
}
@Override
public void setComments(String comments) {
}
@Override
public ProcessGroupCounts getCounts() {
return null;
}
@Override
public void startProcessing() {
}
@Override
public void stopProcessing() {
}
@Override
public void enableProcessor(ProcessorNode processor) {
}
@Override
public void enableInputPort(Port port) {
}
@Override
public void enableOutputPort(Port port) {
}
@Override
public void startProcessor(ProcessorNode processor) {
}
@Override
public void startInputPort(Port port) {
}
@Override
public void startOutputPort(Port port) {
}
@Override
public void startFunnel(Funnel funnel) {
}
@Override
public void stopProcessor(ProcessorNode processor) {
}
@Override
public void stopInputPort(Port port) {
}
@Override
public void stopOutputPort(Port port) {
}
@Override
public void disableProcessor(ProcessorNode processor) {
}
@Override
public void disableInputPort(Port port) {
}
@Override
public void disableOutputPort(Port port) {
}
@Override
public void shutdown() {
}
@Override
public boolean isRootGroup() {
return false;
}
@Override
public void addInputPort(Port port) {
}
@Override
public void removeInputPort(Port port) {
}
@Override
public Set<Port> getInputPorts() {
return null;
}
@Override
public Port getInputPort(String id) {
return null;
}
@Override
public void addOutputPort(Port port) {
}
@Override
public void removeOutputPort(Port port) {
}
@Override
public Port getOutputPort(String id) {
return null;
}
@Override
public Set<Port> getOutputPorts() {
return null;
}
@Override
public void addProcessGroup(ProcessGroup group) {
}
@Override
public ProcessGroup getProcessGroup(String id) {
return null;
}
@Override
public Set<ProcessGroup> getProcessGroups() {
return null;
}
@Override
public void removeProcessGroup(ProcessGroup group) {
}
@Override
public void addProcessor(ProcessorNode processor) {
processor.setProcessGroup(this);
}
@Override
public void removeProcessor(ProcessorNode processor) {
}
@Override
public Set<ProcessorNode> getProcessors() {
return null;
}
@Override
public ProcessorNode getProcessor(String id) {
return null;
}
@Override
public Connectable getConnectable(String id) {
return null;
}
@Override
public void addConnection(Connection connection) {
}
@Override
public void removeConnection(Connection connection) {
}
@Override
public void inheritConnection(Connection connection) {
}
@Override
public Connection getConnection(String id) {
return null;
}
@Override
public Set<Connection> getConnections() {
return null;
}
@Override
public Connection findConnection(String id) {
return null;
}
@Override
public List<Connection> findAllConnections() {
return null;
}
@Override
public Funnel findFunnel(String id) {
return null;
}
@Override
public ControllerServiceNode findControllerService(String id) {
return serviceMap.get(id);
}
@Override
public Set<ControllerServiceNode> findAllControllerServices() {
return new HashSet<>(serviceMap.values());
}
@Override
public void addRemoteProcessGroup(RemoteProcessGroup remoteGroup) {
}
@Override
public void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup) {
}
@Override
public RemoteProcessGroup getRemoteProcessGroup(String id) {
return null;
}
@Override
public Set<RemoteProcessGroup> getRemoteProcessGroups() {
return null;
}
@Override
public void addLabel(Label label) {
}
@Override
public void removeLabel(Label label) {
}
@Override
public Set<Label> getLabels() {
return null;
}
@Override
public Label getLabel(String id) {
return null;
}
@Override
public ProcessGroup findProcessGroup(String id) {
return null;
}
@Override
public List<ProcessGroup> findAllProcessGroups() {
return null;
}
@Override
public RemoteProcessGroup findRemoteProcessGroup(String id) {
return null;
}
@Override
public List<RemoteProcessGroup> findAllRemoteProcessGroups() {
return null;
}
@Override
public ProcessorNode findProcessor(String id) {
return null;
}
@Override
public List<ProcessorNode> findAllProcessors() {
return null;
}
@Override
public Label findLabel(String id) {
return null;
}
@Override
public List<Label> findAllLabels() {
return null;
}
@Override
public Port findInputPort(String id) {
return null;
}
@Override
public List<Port> findAllInputPorts() {
return null;
}
@Override
public Port getInputPortByName(String name) {
return null;
}
@Override
public Port findOutputPort(String id) {
return null;
}
@Override
public List<Port> findAllOutputPorts() {
return null;
}
@Override
public Port getOutputPortByName(String name) {
return null;
}
@Override
public void addFunnel(Funnel funnel) {
}
@Override
public void addFunnel(Funnel funnel, boolean autoStart) {
}
@Override
public Set<Funnel> getFunnels() {
return null;
}
@Override
public Funnel getFunnel(String id) {
return null;
}
@Override
public void removeFunnel(Funnel funnel) {
}
@Override
public void addControllerService(ControllerServiceNode service) {
serviceMap.put(service.getIdentifier(), service);
service.setProcessGroup(this);
}
@Override
public ControllerServiceNode getControllerService(String id) {
return serviceMap.get(id);
}
@Override
public Set<ControllerServiceNode> getControllerServices(boolean recursive) {
return new HashSet<>(serviceMap.values());
}
@Override
public void removeControllerService(ControllerServiceNode service) {
serviceMap.remove(service.getIdentifier());
}
@Override
public boolean isEmpty() {
return false;
}
@Override
public void remove(Snippet snippet) {
}
@Override
public Connectable findConnectable(String identifier) {
return null;
}
@Override
public void move(Snippet snippet, ProcessGroup destination) {
}
@Override
public void verifyCanDelete() {
}
@Override
public void verifyCanDelete(boolean ignorePortConnections) {
}
@Override
public void verifyCanStart() {
}
@Override
public void verifyCanStop() {
}
@Override
public void verifyCanDelete(Snippet snippet) {
}
@Override
public void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup) {
}
}

View File

@ -117,7 +117,7 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
@Test
public void testStateTooLargeExceptionThrown() {
public void testStateTooLargeExceptionThrownOnSetState() {
final Map<String, String> state = new HashMap<>();
final StringBuilder sb = new StringBuilder();
@ -137,8 +137,30 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
} catch (final StateTooLargeException stle) {
// expected behavior.
} catch (final Exception e) {
e.printStackTrace();
Assert.fail("Expected StateTooLargeException but " + e.getClass() + " was thrown", e);
}
}
@Test
public void testStateTooLargeExceptionThrownOnReplace() throws IOException {
final Map<String, String> state = new HashMap<>();
final StringBuilder sb = new StringBuilder();
// Build a string that is a little less than 64 KB, because that's
// the largest value available for DataOutputStream.writeUTF
for (int i = 0; i < 6500; i++) {
sb.append("0123456789");
}
for (int i = 0; i < 20; i++) {
state.put("numbers." + i, sb.toString());
}
final Map<String, String> smallState = new HashMap<>();
smallState.put("abc", "xyz");
getProvider().setState(smallState, componentId);
try {
getProvider().replace(getProvider().getState(componentId), state, componentId);
@ -146,6 +168,7 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
} catch (final StateTooLargeException stle) {
// expected behavior.
} catch (final Exception e) {
e.printStackTrace();
Assert.fail("Expected StateTooLargeException", e);
}

View File

@ -148,7 +148,7 @@ public class TestStandardPropertyValue {
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
return null;
}

View File

@ -20,9 +20,9 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.nar.NarClassLoaders;

View File

@ -1120,17 +1120,19 @@ public interface NiFiServiceFacade {
* Creates a controller service.
*
* @param revision Revision to compare with current base revision
* @param groupId the ID of the Process Group to add the Controller Service to
* @param controllerServiceDTO The controller service DTO
* @return The controller service DTO
*/
ControllerServiceEntity createControllerService(Revision revision, ControllerServiceDTO controllerServiceDTO);
ControllerServiceEntity createControllerService(Revision revision, String groupId, ControllerServiceDTO controllerServiceDTO);
/**
* Gets all controller services.
* Gets all controller services that belong to the given group and its parent/ancestor groups
*
* @param groupId the id of the process group of interest
* @return services
*/
Set<ControllerServiceDTO> getControllerServices();
Set<ControllerServiceDTO> getControllerServices(String groupId);
/**
* Gets the specified controller service.

View File

@ -1452,7 +1452,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public ControllerServiceEntity createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
// TODO: Instead of "root" we need the ID of the Process Group that the Controller Service is being created in.
// Right now, though, they are not scoped to a particular group.
return revisionManager.get("root", new ReadOnlyRevisionCallback<RevisionUpdate<ControllerServiceEntity>>() {
@ -1466,12 +1466,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// create the controller service
final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
group.authorize(authorizer, RequestAction.WRITE);
group.addControllerService(controllerService);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveControllerServices();
} else {
controllerFacade.save();
}
final Revision updatedRevision = new Revision(0L, revision.getClientId(), controllerService.getIdentifier());
final FlowModification lastMod = new FlowModification(updatedRevision, NiFiUserUtils.getNiFiUserName());
@ -1491,7 +1491,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public UpdateResult<ControllerServiceEntity> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
// if controller service does not exist, then create new controller service
if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) {
return new UpdateResult<>(createControllerService(revision, controllerServiceDTO), true);
return new UpdateResult<>(createControllerService(revision, controllerServiceDTO.getParentGroupId(), controllerServiceDTO), true);
}
final String modifier = NiFiUserUtils.getNiFiUserName();
@ -1502,11 +1502,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveControllerServices();
} else {
controllerFacade.save();
}
final Revision updatedRevision = incrementRevision(revision);
final ControllerServiceDTO controllerServiceDto = dtoFactory.createControllerServiceDto(controllerService);
@ -1614,11 +1610,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerServiceDAO.deleteControllerService(controllerServiceId);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveControllerServices();
} else {
controllerFacade.save();
}
final ControllerServiceEntity entity = entityFactory.createControllerServiceEntity(dto, null, null);
return new StandardRevisionUpdate<>(entity, null);
@ -2032,7 +2024,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor);
return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier());
}
@Override
@ -2407,13 +2399,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public Set<ControllerServiceDTO> getControllerServices() {
public Set<ControllerServiceDTO> getControllerServices(String groupId) {
final Set<ControllerServiceDTO> controllerServiceDtos = new LinkedHashSet<>();
final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices();
final Set<ControllerServiceNode> serviceNodes = getGroup(groupId).getControllerServices(true);
final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
revisionManager.get(serviceIds, () -> {
for (ControllerServiceNode controllerService : controllerServiceDAO.getControllerServices()) {
for (ControllerServiceNode controllerService : serviceNodes) {
controllerServiceDtos.add(dtoFactory.createControllerServiceDto(controllerService));
}
return null;
@ -2438,7 +2430,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor);
return dtoFactory.createPropertyDescriptorDto(descriptor, controllerService.getProcessGroup().getIdentifier());
});
}
@ -2479,7 +2471,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor);
return dtoFactory.createPropertyDescriptorDto(descriptor, "root");
}
@Override

View File

@ -2514,7 +2514,7 @@ public class ProcessGroupResource extends ApplicationResource {
// create the controller service and generate the json
final ControllerServiceEntity entity = serviceFacade.createControllerService(
new Revision(revision.getVersion(), revision.getClientId()), controllerServiceEntity.getControllerService());
new Revision(revision.getVersion(), revision.getClientId()), groupId, controllerServiceEntity.getControllerService());
// build the response entity
controllerServiceResource.populateRemainingControllerServiceContent(availability, entity.getControllerService());
@ -2574,7 +2574,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
// get all the controller services
final Set<ControllerServiceDTO> controllerServices = controllerServiceResource.populateRemainingControllerServicesContent(availability, serviceFacade.getControllerServices());
final Set<ControllerServiceDTO> controllerServices = controllerServiceResource.populateRemainingControllerServicesContent(availability, serviceFacade.getControllerServices(groupId));
// create the revision
final RevisionDTO revision = new RevisionDTO();

View File

@ -1113,7 +1113,7 @@ public final class DtoFactory {
final PropertyDescriptor descriptor = entry.getKey();
// store the property descriptor
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor));
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, "root"));
// determine the property value - don't include sensitive properties
String propertyValue = entry.getValue();
@ -1176,7 +1176,7 @@ public final class DtoFactory {
final PropertyDescriptor descriptor = entry.getKey();
// store the property descriptor
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor));
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, controllerServiceNode.getProcessGroup().getIdentifier()));
// determine the property value - don't include sensitive properties
String propertyValue = entry.getValue();
@ -1267,6 +1267,7 @@ public final class DtoFactory {
dto.setId(component.getIdentifier());
dto.setName(component.getName());
String processGroupId = null;
List<PropertyDescriptor> propertyDescriptors = null;
Collection<ValidationResult> validationErrors = null;
if (component instanceof ProcessorNode) {
@ -1279,6 +1280,7 @@ public final class DtoFactory {
propertyDescriptors = node.getProcessor().getPropertyDescriptors();
validationErrors = node.getValidationErrors();
processGroupId = node.getProcessGroup().getIdentifier();
} else if (component instanceof ControllerServiceNode) {
final ControllerServiceNode node = ((ControllerServiceNode) component);
dto.setState(node.getState().name());
@ -1293,6 +1295,7 @@ public final class DtoFactory {
propertyDescriptors = node.getControllerServiceImplementation().getPropertyDescriptors();
validationErrors = node.getValidationErrors();
processGroupId = node.getProcessGroup().getIdentifier();
} else if (component instanceof ReportingTaskNode) {
final ReportingTaskNode node = ((ReportingTaskNode) component);
dto.setState(node.getScheduledState().name());
@ -1302,6 +1305,7 @@ public final class DtoFactory {
propertyDescriptors = node.getReportingTask().getPropertyDescriptors();
validationErrors = node.getValidationErrors();
processGroupId = "root";
}
if (propertyDescriptors != null && !propertyDescriptors.isEmpty()) {
@ -1326,7 +1330,7 @@ public final class DtoFactory {
final PropertyDescriptor descriptor = entry.getKey();
// store the property descriptor
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor));
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, processGroupId));
// determine the property value - don't include sensitive properties
String propertyValue = entry.getValue();
@ -2216,7 +2220,7 @@ public final class DtoFactory {
final PropertyDescriptor descriptor = entry.getKey();
// store the property descriptor
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor));
dto.getDescriptors().put(descriptor.getName(), createPropertyDescriptorDto(descriptor, procNode.getProcessGroup().getIdentifier()));
// determine the property value - don't include sensitive properties
String propertyValue = entry.getValue();
@ -2258,9 +2262,10 @@ public final class DtoFactory {
* Creates a PropertyDesriptorDTO from the specified PropertyDesriptor.
*
* @param propertyDescriptor descriptor
* @param groupId the Identifier of the Process Group that the component belongs to
* @return dto
*/
public PropertyDescriptorDTO createPropertyDescriptorDto(final PropertyDescriptor propertyDescriptor) {
public PropertyDescriptorDTO createPropertyDescriptorDto(final PropertyDescriptor propertyDescriptor, final String groupId) {
if (propertyDescriptor == null) {
return null;
}
@ -2287,7 +2292,7 @@ public final class DtoFactory {
dto.setAllowableValues(null);
} else {
final List<AllowableValueDTO> allowableValues = new ArrayList<>();
for (final String serviceIdentifier : controllerServiceLookup.getControllerServiceIdentifiers(serviceDefinition)) {
for (final String serviceIdentifier : controllerServiceLookup.getControllerServiceIdentifiers(serviceDefinition, groupId)) {
final String displayName = controllerServiceLookup.getControllerServiceName(serviceIdentifier);
final AllowableValueDTO allowableValue = new AllowableValueDTO();