mirror of https://github.com/apache/nifi.git
NIFI-4436: Ensure that on save, we assign a Versioned Component Identifier to inner process groups that are tracking to remote flows, if they don't have one. This would occur, for instance, if a Process Group was imported into an existing group (or copied/moved into it) and then the existing group was saved.
NIFI-4436: Fixed a bug that caused a flow not to successfully change version if a connection is added to an existing component and that component is running at time of version change NIFI-4436: Fixed bug with ordering of controller services being enabled and disabled NIFI-4436: Fixed bug that prevented local input and output ports from being stopped and started as needed NIFI-4436: Fixed bugs around referencing controller services that are at a higher level than the versioned flow NIFI-4436: Ensure that we clear components from FlowController's cache when removed and that they are added to cache when created. NIFI-4436: Fixed error message coming back if component is invalid when trying to be restarted/re-enabled NIFI-4436: Addressed issue with children of a removed process group not being considered 'affected components' and as a result not being stopped/disabled/restarted/re-enabled This closes #2219. Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
parent
fa996cd418
commit
b6117743d4
|
@ -26,6 +26,8 @@ import java.util.Collection;
|
||||||
public class AffectedComponentDTO {
|
public class AffectedComponentDTO {
|
||||||
public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR";
|
public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR";
|
||||||
public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE";
|
public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE";
|
||||||
|
public static final String COMPONENT_TYPE_INPUT_PORT = "INPUT_PORT";
|
||||||
|
public static final String COMPONENT_TYPE_OUTPUT_PORT = "OUTPUT_PORT";
|
||||||
public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT";
|
public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT";
|
||||||
public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT";
|
public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT";
|
||||||
|
|
||||||
|
@ -58,6 +60,7 @@ public class AffectedComponentDTO {
|
||||||
|
|
||||||
@ApiModelProperty(value = "The type of this component",
|
@ApiModelProperty(value = "The type of this component",
|
||||||
allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", "
|
allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", "
|
||||||
|
+ COMPONENT_TYPE_INPUT_PORT + ", " + COMPONENT_TYPE_OUTPUT_PORT + ", "
|
||||||
+ COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT)
|
+ COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT)
|
||||||
public String getReferenceType() {
|
public String getReferenceType() {
|
||||||
return referenceType;
|
return referenceType;
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.net.URL;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||||
import org.apache.nifi.bundle.BundleCoordinate;
|
import org.apache.nifi.bundle.BundleCoordinate;
|
||||||
|
@ -83,6 +84,16 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
|
||||||
*/
|
*/
|
||||||
void enableControllerServices(Collection<ControllerServiceNode> serviceNodes);
|
void enableControllerServices(Collection<ControllerServiceNode> serviceNodes);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enables the collection of services in the background. If a service in this collection
|
||||||
|
* depends on another service, the service being depended on must either already be enabled
|
||||||
|
* or must be in the collection as well.
|
||||||
|
*
|
||||||
|
* @param serviceNodes the nodes
|
||||||
|
* @return a Future that can be used to cancel the task or wait until it is completed
|
||||||
|
*/
|
||||||
|
Future<Void> enableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disables the given controller service so that it cannot be used by other
|
* Disables the given controller service so that it cannot be used by other
|
||||||
* components. This allows configuration to be updated or allows service to
|
* components. This allows configuration to be updated or allows service to
|
||||||
|
@ -92,6 +103,15 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> disableControllerService(ControllerServiceNode serviceNode);
|
CompletableFuture<Void> disableControllerService(ControllerServiceNode serviceNode);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disables the collection of services in the background. If any of the services given is referenced
|
||||||
|
* by another service, then that other service must either be disabled or be in the given collection.
|
||||||
|
*
|
||||||
|
* @param serviceNodes the nodes the disable
|
||||||
|
* @return a Future that can be used to cancel the task or wait until it is completed
|
||||||
|
*/
|
||||||
|
Future<Void> disableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a Set of all Controller Services that exist for this service
|
* @return a Set of all Controller Services that exist for this service
|
||||||
* provider
|
* provider
|
||||||
|
|
|
@ -245,6 +245,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -3609,12 +3610,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
controllerServiceProvider.enableControllerServices(serviceNodes);
|
controllerServiceProvider.enableControllerServices(serviceNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
|
||||||
|
return controllerServiceProvider.enableControllerServicesAsync(serviceNodes);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) {
|
public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
serviceNode.verifyCanDisable();
|
serviceNode.verifyCanDisable();
|
||||||
return controllerServiceProvider.disableControllerService(serviceNode);
|
return controllerServiceProvider.disableControllerService(serviceNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
|
||||||
|
return controllerServiceProvider.disableControllerServicesAsync(serviceNodes);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
|
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
|
||||||
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
|
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.ClassUtils;
|
import org.apache.commons.lang3.ClassUtils;
|
||||||
|
@ -50,13 +51,13 @@ import org.apache.nifi.controller.ConfiguredComponent;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.controller.FlowController;
|
import org.apache.nifi.controller.FlowController;
|
||||||
import org.apache.nifi.controller.LoggableComponent;
|
import org.apache.nifi.controller.LoggableComponent;
|
||||||
import org.apache.nifi.controller.ProcessScheduler;
|
|
||||||
import org.apache.nifi.controller.ProcessorNode;
|
import org.apache.nifi.controller.ProcessorNode;
|
||||||
import org.apache.nifi.controller.ReportingTaskNode;
|
import org.apache.nifi.controller.ReportingTaskNode;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
import org.apache.nifi.controller.ValidationContextFactory;
|
import org.apache.nifi.controller.ValidationContextFactory;
|
||||||
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
||||||
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
|
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
|
||||||
|
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
|
||||||
import org.apache.nifi.events.BulletinFactory;
|
import org.apache.nifi.events.BulletinFactory;
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
@ -78,7 +79,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
|
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
|
||||||
|
|
||||||
private final ProcessScheduler processScheduler;
|
private final StandardProcessScheduler processScheduler;
|
||||||
private final BulletinRepository bulletinRepo;
|
private final BulletinRepository bulletinRepo;
|
||||||
private final StateManagerProvider stateManagerProvider;
|
private final StateManagerProvider stateManagerProvider;
|
||||||
private final VariableRegistry variableRegistry;
|
private final VariableRegistry variableRegistry;
|
||||||
|
@ -87,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
|
|
||||||
private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
|
public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo,
|
||||||
final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) {
|
final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) {
|
||||||
|
|
||||||
this.flowController = flowController;
|
this.flowController = flowController;
|
||||||
|
@ -384,6 +385,74 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
|
||||||
|
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
processScheduler.submitFrameworkTask(() -> {
|
||||||
|
enableControllerServices(serviceNodes, future);
|
||||||
|
future.complete(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
|
||||||
|
// validate that we are able to start all of the services.
|
||||||
|
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
|
||||||
|
while (serviceIter.hasNext()) {
|
||||||
|
ControllerServiceNode controllerServiceNode = serviceIter.next();
|
||||||
|
List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
|
||||||
|
for (ControllerServiceNode requiredService : requiredServices) {
|
||||||
|
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
|
||||||
|
logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService);
|
||||||
|
completableFuture.completeExceptionally(new IllegalStateException("Cannot enable " + controllerServiceNode
|
||||||
|
+ " because it has a dependency on " + requiredService + ", which is not enabled"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
|
||||||
|
if (completableFuture.isCancelled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!controllerServiceNode.isActive()) {
|
||||||
|
final Future<Void> future = enableControllerServiceDependenciesFirst(controllerServiceNode);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
future.get(1, TimeUnit.SECONDS);
|
||||||
|
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
|
||||||
|
break;
|
||||||
|
} catch (final TimeoutException e) {
|
||||||
|
if (completableFuture.isCancelled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.warn("Failed to enable service {}", controllerServiceNode, e);
|
||||||
|
completableFuture.completeExceptionally(e);
|
||||||
|
|
||||||
|
if (this.bulletinRepo != null) {
|
||||||
|
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
|
||||||
|
Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to enable " + controllerServiceNode, e);
|
||||||
|
if (this.bulletinRepo != null) {
|
||||||
|
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
|
||||||
|
Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
|
private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
|
||||||
final Map<ControllerServiceNode, Future<Void>> futures = new HashMap<>();
|
final Map<ControllerServiceNode, Future<Void>> futures = new HashMap<>();
|
||||||
|
|
||||||
|
@ -460,6 +529,58 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
return processScheduler.disableControllerService(serviceNode);
|
return processScheduler.disableControllerService(serviceNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
|
||||||
|
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
processScheduler.submitFrameworkTask(() -> {
|
||||||
|
disableControllerServices(serviceNodes, future);
|
||||||
|
future.complete(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) {
|
||||||
|
final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes);
|
||||||
|
|
||||||
|
// Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection
|
||||||
|
for (final ControllerServiceNode serviceNode : serviceNodes) {
|
||||||
|
final List<ControllerServiceNode> references = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
|
||||||
|
for (final ControllerServiceNode reference : references) {
|
||||||
|
if (reference.isActive()) {
|
||||||
|
try {
|
||||||
|
reference.verifyCanDisable(serviceNodeSet);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final ControllerServiceNode serviceNode : serviceNodes) {
|
||||||
|
if (serviceNode.isActive()) {
|
||||||
|
disableReferencingServices(serviceNode);
|
||||||
|
final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
serviceFuture.get(1, TimeUnit.SECONDS);
|
||||||
|
break;
|
||||||
|
} catch (final TimeoutException e) {
|
||||||
|
if (future.isCancelled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Failed to disable {}", serviceNode, e);
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ControllerService getControllerService(final String serviceIdentifier) {
|
public ControllerService getControllerService(final String serviceIdentifier) {
|
||||||
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
|
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
|
||||||
|
|
|
@ -738,7 +738,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ControllerServiceNode cs : group.getControllerServices(false)) {
|
for (final ControllerServiceNode cs : group.getControllerServices(false)) {
|
||||||
group.removeControllerService(cs);
|
// Must go through Controller Service here because we need to ensure that it is removed from the cache
|
||||||
|
flowController.removeControllerService(cs);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) {
|
for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) {
|
||||||
|
@ -3158,9 +3159,13 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
.forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
|
.forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
|
||||||
});
|
});
|
||||||
|
|
||||||
processGroup.getProcessGroups().stream()
|
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
|
||||||
.filter(childGroup -> childGroup.getVersionControlInformation() == null)
|
if (childGroup.getVersionControlInformation() == null) {
|
||||||
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
|
applyVersionedComponentIds(childGroup, lookup);
|
||||||
|
} else if (!childGroup.getVersionedComponentId().isPresent()) {
|
||||||
|
childGroup.setVersionedComponentId(lookup.apply(childGroup.getIdentifier()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3242,7 +3247,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
|
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
|
||||||
final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
|
final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
|
||||||
|
|
||||||
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
|
final FlowComparator flowComparator = new StandardFlowComparator(remoteFlow, localFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
|
||||||
final FlowComparison flowComparison = flowComparator.compare();
|
final FlowComparison flowComparison = flowComparator.compare();
|
||||||
|
|
||||||
final Set<String> updatedVersionedComponentIds = new HashSet<>();
|
final Set<String> updatedVersionedComponentIds = new HashSet<>();
|
||||||
|
@ -3387,7 +3392,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
.map(VariableDescriptor::getName)
|
.map(VariableDescriptor::getName)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
|
||||||
final Map<String, String> updatedVariableMap = new HashMap<>();
|
final Map<String, String> updatedVariableMap = new HashMap<>();
|
||||||
|
|
||||||
// If any new variables exist in the proposed flow, add those to the variable registry.
|
// If any new variables exist in the proposed flow, add those to the variable registry.
|
||||||
|
@ -3477,6 +3481,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
if (childGroup == null) {
|
if (childGroup == null) {
|
||||||
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
|
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
|
||||||
|
flowController.onProcessGroupAdded(added);
|
||||||
added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
|
added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
|
||||||
LOG.info("Added {} to {}", added, this);
|
LOG.info("Added {} to {}", added, this);
|
||||||
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
|
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
|
||||||
|
@ -3496,6 +3501,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
|
final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
|
||||||
if (funnel == null) {
|
if (funnel == null) {
|
||||||
final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed);
|
final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed);
|
||||||
|
flowController.onFunnelAdded(added);
|
||||||
LOG.info("Added {} to {}", added, this);
|
LOG.info("Added {} to {}", added, this);
|
||||||
} else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
|
} else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
|
||||||
updateFunnel(funnel, proposedFunnel);
|
updateFunnel(funnel, proposedFunnel);
|
||||||
|
@ -3517,6 +3523,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
|
final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
|
||||||
if (port == null) {
|
if (port == null) {
|
||||||
final Port added = addInputPort(group, proposedPort, componentIdSeed);
|
final Port added = addInputPort(group, proposedPort, componentIdSeed);
|
||||||
|
flowController.onInputPortAdded(added);
|
||||||
LOG.info("Added {} to {}", added, this);
|
LOG.info("Added {} to {}", added, this);
|
||||||
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
|
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
|
||||||
updatePort(port, proposedPort);
|
updatePort(port, proposedPort);
|
||||||
|
@ -3537,6 +3544,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
|
final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
|
||||||
if (port == null) {
|
if (port == null) {
|
||||||
final Port added = addOutputPort(group, proposedPort, componentIdSeed);
|
final Port added = addOutputPort(group, proposedPort, componentIdSeed);
|
||||||
|
flowController.onOutputPortAdded(added);
|
||||||
LOG.info("Added {} to {}", added, this);
|
LOG.info("Added {} to {}", added, this);
|
||||||
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
|
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
|
||||||
updatePort(port, proposedPort);
|
updatePort(port, proposedPort);
|
||||||
|
@ -3580,6 +3588,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
|
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
|
||||||
if (processor == null) {
|
if (processor == null) {
|
||||||
final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed);
|
final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed);
|
||||||
|
flowController.onProcessorAdded(added);
|
||||||
|
|
||||||
final Set<Relationship> proposedAutoTerminated =
|
final Set<Relationship> proposedAutoTerminated =
|
||||||
proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
|
proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
|
||||||
|
@ -3638,6 +3647,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
|
final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
final Connection added = addConnection(group, proposedConnection, componentIdSeed);
|
final Connection added = addConnection(group, proposedConnection, componentIdSeed);
|
||||||
|
flowController.onConnectionAdded(added);
|
||||||
LOG.info("Added {} to {}", added, this);
|
LOG.info("Added {} to {}", added, this);
|
||||||
} else if (isUpdateable(connection)) {
|
} else if (isUpdateable(connection)) {
|
||||||
// If the connection needs to be updated, then the source and destination will already have
|
// If the connection needs to be updated, then the source and destination will already have
|
||||||
|
@ -3658,6 +3668,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final Connection connection = connectionsByVersionedId.get(removedVersionedId);
|
final Connection connection = connectionsByVersionedId.get(removedVersionedId);
|
||||||
LOG.info("Removing {} from {}", connection, group);
|
LOG.info("Removing {} from {}", connection, group);
|
||||||
group.removeConnection(connection);
|
group.removeConnection(connection);
|
||||||
|
flowController.onConnectionRemoved(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
|
// Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
|
||||||
|
@ -3670,7 +3681,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
for (final String removedVersionedId : controllerServicesRemoved) {
|
for (final String removedVersionedId : controllerServicesRemoved) {
|
||||||
final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
|
final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
|
||||||
LOG.info("Removing {} from {}", service, group);
|
LOG.info("Removing {} from {}", service, group);
|
||||||
group.removeControllerService(service);
|
// Must remove Controller Service through Flow Controller in order to remove from cache
|
||||||
|
flowController.removeControllerService(service);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final String removedVersionedId : funnelsRemoved) {
|
for (final String removedVersionedId : funnelsRemoved) {
|
||||||
|
@ -4065,13 +4077,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
// to the instance ID of the Controller Service.
|
// to the instance ID of the Controller Service.
|
||||||
final String serviceVersionedComponentId = entry.getValue();
|
final String serviceVersionedComponentId = entry.getValue();
|
||||||
String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
|
String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
|
||||||
if (instanceId == null) {
|
|
||||||
// We didn't find the instance ID based on the Versioned Component ID. So we want to just
|
|
||||||
// leave the value set to whatever it currently is, if it's currently set.
|
|
||||||
final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build();
|
|
||||||
instanceId = currentProperties.get(propertyDescriptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
value = instanceId == null ? serviceVersionedComponentId : instanceId;
|
value = instanceId == null ? serviceVersionedComponentId : instanceId;
|
||||||
} else {
|
} else {
|
||||||
value = entry.getValue();
|
value = entry.getValue();
|
||||||
|
@ -4085,13 +4090,9 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
|
private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
|
||||||
for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
|
for (final ControllerServiceNode serviceNode : group.getControllerServices(true)) {
|
||||||
final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId();
|
final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId();
|
||||||
if (!optionalVersionedId.isPresent()) {
|
final String versionedId = optionalVersionedId.orElseGet(() -> UUID.nameUUIDFromBytes(serviceNode.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString());
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String versionedId = optionalVersionedId.get();
|
|
||||||
if (versionedId.equals(serviceVersionedComponentId)) {
|
if (versionedId.equals(serviceVersionedComponentId)) {
|
||||||
return serviceNode.getIdentifier();
|
return serviceNode.getIdentifier();
|
||||||
}
|
}
|
||||||
|
@ -4319,7 +4320,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Ensure that all Processors are instantiate-able.
|
// Ensure that all Processors are instantiate-able.
|
||||||
final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
|
final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
|
||||||
findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors);
|
findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors);
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.FlowController;
|
import org.apache.nifi.controller.FlowController;
|
||||||
import org.apache.nifi.controller.LoggableComponent;
|
import org.apache.nifi.controller.LoggableComponent;
|
||||||
import org.apache.nifi.controller.ProcessScheduler;
|
|
||||||
import org.apache.nifi.controller.ProcessorNode;
|
import org.apache.nifi.controller.ProcessorNode;
|
||||||
import org.apache.nifi.controller.ReloadComponent;
|
import org.apache.nifi.controller.ReloadComponent;
|
||||||
import org.apache.nifi.controller.ReportingTaskNode;
|
import org.apache.nifi.controller.ReportingTaskNode;
|
||||||
|
@ -269,7 +268,7 @@ public class TestStandardProcessScheduler {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
|
public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
|
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
|
||||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
||||||
|
@ -308,7 +307,7 @@ public class TestStandardProcessScheduler {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void validateDisabledServiceCantBeDisabled() throws Exception {
|
public void validateDisabledServiceCantBeDisabled() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
|
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
|
||||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
||||||
|
@ -346,7 +345,7 @@ public class TestStandardProcessScheduler {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
|
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
|
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
|
||||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
||||||
|
@ -380,7 +379,7 @@ public class TestStandardProcessScheduler {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void validateDisablingOfTheFailedService() throws Exception {
|
public void validateDisablingOfTheFailedService() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
|
final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
|
||||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
||||||
|
@ -412,7 +411,7 @@ public class TestStandardProcessScheduler {
|
||||||
@Test
|
@Test
|
||||||
@Ignore
|
@Ignore
|
||||||
public void validateEnabledDisableMultiThread() throws Exception {
|
public void validateEnabledDisableMultiThread() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
final ExecutorService executor = Executors.newCachedThreadPool();
|
||||||
for (int i = 0; i < 200; i++) {
|
for (int i = 0; i < 200; i++) {
|
||||||
|
@ -455,7 +454,7 @@ public class TestStandardProcessScheduler {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
|
public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
|
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
|
||||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
||||||
|
@ -481,7 +480,7 @@ public class TestStandardProcessScheduler {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
|
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
|
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
|
||||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
|
||||||
|
@ -581,7 +580,7 @@ public class TestStandardProcessScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ProcessScheduler createScheduler() {
|
private StandardProcessScheduler createScheduler() {
|
||||||
return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties);
|
return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.nifi.components.state.StateManager;
|
||||||
import org.apache.nifi.components.state.StateManagerProvider;
|
import org.apache.nifi.components.state.StateManagerProvider;
|
||||||
import org.apache.nifi.controller.FlowController;
|
import org.apache.nifi.controller.FlowController;
|
||||||
import org.apache.nifi.controller.LoggableComponent;
|
import org.apache.nifi.controller.LoggableComponent;
|
||||||
import org.apache.nifi.controller.ProcessScheduler;
|
|
||||||
import org.apache.nifi.controller.ProcessorNode;
|
import org.apache.nifi.controller.ProcessorNode;
|
||||||
import org.apache.nifi.controller.ReloadComponent;
|
import org.apache.nifi.controller.ReloadComponent;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
|
@ -146,7 +145,7 @@ public class TestStandardControllerServiceProvider {
|
||||||
final FlowController controller = Mockito.mock(FlowController.class);
|
final FlowController controller = Mockito.mock(FlowController.class);
|
||||||
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||||
|
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider =
|
final StandardControllerServiceProvider provider =
|
||||||
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
|
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
|
||||||
|
|
||||||
|
@ -162,7 +161,7 @@ public class TestStandardControllerServiceProvider {
|
||||||
final FlowController controller = Mockito.mock(FlowController.class);
|
final FlowController controller = Mockito.mock(FlowController.class);
|
||||||
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
|
||||||
|
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
final StandardControllerServiceProvider provider =
|
final StandardControllerServiceProvider provider =
|
||||||
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
|
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
|
||||||
|
|
||||||
|
@ -214,13 +213,13 @@ public class TestStandardControllerServiceProvider {
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 120000)
|
@Test(timeout = 120000)
|
||||||
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
|
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
|
||||||
final ProcessScheduler scheduler = createScheduler();
|
final StandardProcessScheduler scheduler = createScheduler();
|
||||||
for (int i = 0; i < 5000; i++) {
|
for (int i = 0; i < 5000; i++) {
|
||||||
testEnableReferencingServicesGraph(scheduler);
|
testEnableReferencingServicesGraph(scheduler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) {
|
public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) {
|
||||||
final ProcessGroup procGroup = new MockProcessGroup(controller);
|
final ProcessGroup procGroup = new MockProcessGroup(controller);
|
||||||
final FlowController controller = Mockito.mock(FlowController.class);
|
final FlowController controller = Mockito.mock(FlowController.class);
|
||||||
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||||
|
|
|
@ -116,6 +116,7 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
|
||||||
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
|
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
|
||||||
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
|
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -777,6 +778,15 @@ public interface NiFiServiceFacade {
|
||||||
*/
|
*/
|
||||||
PortEntity getInputPort(String inputPortId);
|
PortEntity getInputPort(String inputPortId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets an input port as it is available to the given user
|
||||||
|
*
|
||||||
|
* @param inputPortId The input port id
|
||||||
|
* @param user the user
|
||||||
|
* @return port
|
||||||
|
*/
|
||||||
|
PortEntity getInputPort(String inputPortId, NiFiUser user);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets all input ports in a given group.
|
* Gets all input ports in a given group.
|
||||||
*
|
*
|
||||||
|
@ -846,6 +856,15 @@ public interface NiFiServiceFacade {
|
||||||
*/
|
*/
|
||||||
PortEntity getOutputPort(String outputPortId);
|
PortEntity getOutputPort(String outputPortId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets an output port as it is available to the given user
|
||||||
|
*
|
||||||
|
* @param outputPortId The output port id
|
||||||
|
* @param user the user
|
||||||
|
* @return port
|
||||||
|
*/
|
||||||
|
PortEntity getOutputPort(String outputPortId, NiFiUser user);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets all output ports in a given group.
|
* Gets all output ports in a given group.
|
||||||
*
|
*
|
||||||
|
@ -1008,7 +1027,7 @@ public interface NiFiServiceFacade {
|
||||||
* @param state the state
|
* @param state the state
|
||||||
* @param serviceIds the id's of the services
|
* @param serviceIds the id's of the services
|
||||||
*/
|
*/
|
||||||
void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Set<String> serviceIds);
|
void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Collection<String> serviceIds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enables or disables the controller services with the given IDs & Revisions on behalf of the currently logged in user
|
* Enables or disables the controller services with the given IDs & Revisions on behalf of the currently logged in user
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.web;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown whenever a flow cannot be resumed due to validation error, etc.
|
||||||
|
*/
|
||||||
|
public class ResumeFlowException extends Exception {
|
||||||
|
public ResumeFlowException(final String message, final Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResumeFlowException(final String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web;
|
package org.apache.nifi.web;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import javax.ws.rs.WebApplicationException;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.nifi.action.Action;
|
import org.apache.nifi.action.Action;
|
||||||
import org.apache.nifi.action.Component;
|
import org.apache.nifi.action.Component;
|
||||||
|
@ -271,8 +273,8 @@ import org.apache.nifi.web.util.SnippetUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.ws.rs.WebApplicationException;
|
import com.google.common.collect.Sets;
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -484,8 +486,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
|
public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
|
||||||
processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds);
|
processGroupDAO.verifyActivateControllerServices(state, serviceIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1016,7 +1018,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
@Override
|
@Override
|
||||||
public RevisionUpdate<ActivateControllerServicesEntity> update() {
|
public RevisionUpdate<ActivateControllerServicesEntity> update() {
|
||||||
// schedule the components
|
// schedule the components
|
||||||
processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
|
processGroupDAO.activateControllerServices(state, serviceRevisions.keySet());
|
||||||
|
|
||||||
// update the revisions
|
// update the revisions
|
||||||
final Map<String, Revision> updatedRevisions = new HashMap<>();
|
final Map<String, Revision> updatedRevisions = new HashMap<>();
|
||||||
|
@ -3289,8 +3291,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
}
|
}
|
||||||
|
|
||||||
private PortEntity createInputPortEntity(final Port port) {
|
private PortEntity createInputPortEntity(final Port port) {
|
||||||
|
return createInputPortEntity(port, NiFiUserUtils.getNiFiUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
private PortEntity createInputPortEntity(final Port port, final NiFiUser user) {
|
||||||
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
|
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
|
||||||
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
|
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user);
|
||||||
final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
|
final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
|
||||||
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
|
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
|
||||||
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
|
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
|
||||||
|
@ -3298,8 +3304,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
}
|
}
|
||||||
|
|
||||||
private PortEntity createOutputPortEntity(final Port port) {
|
private PortEntity createOutputPortEntity(final Port port) {
|
||||||
|
return createOutputPortEntity(port, NiFiUserUtils.getNiFiUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
private PortEntity createOutputPortEntity(final Port port, final NiFiUser user) {
|
||||||
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
|
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
|
||||||
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
|
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user);
|
||||||
final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
|
final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
|
||||||
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
|
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
|
||||||
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
|
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
|
||||||
|
@ -3408,6 +3418,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
return createInputPortEntity(port);
|
return createInputPortEntity(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PortEntity getInputPort(final String inputPortId, final NiFiUser user) {
|
||||||
|
final Port port = inputPortDAO.getPort(inputPortId);
|
||||||
|
return createInputPortEntity(port, user);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PortStatusEntity getInputPortStatus(final String inputPortId) {
|
public PortStatusEntity getInputPortStatus(final String inputPortId) {
|
||||||
final Port inputPort = inputPortDAO.getPort(inputPortId);
|
final Port inputPort = inputPortDAO.getPort(inputPortId);
|
||||||
|
@ -3422,6 +3438,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
return createOutputPortEntity(port);
|
return createOutputPortEntity(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PortEntity getOutputPort(final String outputPortId, final NiFiUser user) {
|
||||||
|
final Port port = outputPortDAO.getPort(outputPortId);
|
||||||
|
return createOutputPortEntity(port, user);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PortStatusEntity getOutputPortStatus(final String outputPortId) {
|
public PortStatusEntity getOutputPortStatus(final String outputPortId) {
|
||||||
final Port outputPort = outputPortDAO.getPort(outputPortId);
|
final Port outputPort = outputPortDAO.getPort(outputPortId);
|
||||||
|
@ -3974,28 +3996,95 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
})
|
})
|
||||||
.collect(Collectors.toCollection(HashSet::new));
|
.collect(Collectors.toCollection(HashSet::new));
|
||||||
|
|
||||||
final Map<String, List<Connection>> connectionsByVersionedId = group.findAllConnections().stream()
|
for (final FlowDifference difference : comparison.getDifferences()) {
|
||||||
.filter(conn -> conn.getVersionedComponentId().isPresent())
|
final VersionedComponent localComponent = difference.getComponentA();
|
||||||
.collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get()));
|
if (localComponent == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any Process Group is removed, consider all components below that Process Group as an affected component
|
||||||
|
if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) {
|
||||||
|
final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId();
|
||||||
|
final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);
|
||||||
|
|
||||||
|
localGroup.findAllProcessors().stream()
|
||||||
|
.map(comp -> createAffectedComponentEntity(comp, user))
|
||||||
|
.forEach(affectedComponents::add);
|
||||||
|
localGroup.findAllFunnels().stream()
|
||||||
|
.map(comp -> createAffectedComponentEntity(comp, user))
|
||||||
|
.forEach(affectedComponents::add);
|
||||||
|
localGroup.findAllInputPorts().stream()
|
||||||
|
.map(comp -> createAffectedComponentEntity(comp, user))
|
||||||
|
.forEach(affectedComponents::add);
|
||||||
|
localGroup.findAllOutputPorts().stream()
|
||||||
|
.map(comp -> createAffectedComponentEntity(comp, user))
|
||||||
|
.forEach(affectedComponents::add);
|
||||||
|
localGroup.findAllRemoteProcessGroups().stream()
|
||||||
|
.flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
|
||||||
|
.map(comp -> createAffectedComponentEntity(comp, user))
|
||||||
|
.forEach(affectedComponents::add);
|
||||||
|
localGroup.findAllControllerServices().stream()
|
||||||
|
.map(comp -> createAffectedComponentEntity(comp, user))
|
||||||
|
.forEach(affectedComponents::add);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) {
|
||||||
|
final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
|
||||||
|
final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
|
||||||
|
|
||||||
|
final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
|
||||||
|
for (final ControllerServiceNode referencingService : referencingServices) {
|
||||||
|
affectedComponents.add(createAffectedComponentEntity(referencingService, user));
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
|
||||||
|
for (final ProcessorNode referencingProcessor : referencingProcessors) {
|
||||||
|
affectedComponents.add(createAffectedComponentEntity(referencingProcessor, user));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a map of all connectable components by versioned component ID to the connectable component itself
|
||||||
|
final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
|
||||||
|
mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
|
||||||
|
mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
|
||||||
|
mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
|
||||||
|
mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);
|
||||||
|
|
||||||
|
final List<RemoteGroupPort> remotePorts = new ArrayList<>();
|
||||||
|
for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
|
||||||
|
remotePorts.addAll(rpg.getInputPorts());
|
||||||
|
remotePorts.addAll(rpg.getOutputPorts());
|
||||||
|
}
|
||||||
|
mapToConnectableId(remotePorts, connectablesByVersionId);
|
||||||
|
|
||||||
|
// If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
|
||||||
|
// and the destination (if it exists in the flow currently).
|
||||||
for (final FlowDifference difference : comparison.getDifferences()) {
|
for (final FlowDifference difference : comparison.getDifferences()) {
|
||||||
VersionedComponent component = difference.getComponentA();
|
VersionedComponent component = difference.getComponentA();
|
||||||
if (component == null) {
|
if (component == null) {
|
||||||
component = difference.getComponentB();
|
component = difference.getComponentB();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
|
if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
|
||||||
final VersionedConnection connection = (VersionedConnection) component;
|
|
||||||
|
|
||||||
final String versionedConnectionId = connection.getIdentifier();
|
|
||||||
final List<Connection> instances = connectionsByVersionedId.get(versionedConnectionId);
|
|
||||||
if (instances == null) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Connection instance : instances) {
|
final VersionedConnection connection = (VersionedConnection) component;
|
||||||
affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user));
|
|
||||||
affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user));
|
final String sourceVersionedId = connection.getSource().getId();
|
||||||
|
final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
|
||||||
|
if (sources != null) {
|
||||||
|
for (final Connectable source : sources) {
|
||||||
|
affectedComponents.add(createAffectedComponentEntity(source, user));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final String destinationVersionId = connection.getDestination().getId();
|
||||||
|
final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
|
||||||
|
if (destinations != null) {
|
||||||
|
for (final Connectable destination : destinations) {
|
||||||
|
affectedComponents.add(createAffectedComponentEntity(destination, user));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4003,6 +4092,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
return affectedComponents;
|
return affectedComponents;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
|
||||||
|
for (final Connectable connectable : connectables) {
|
||||||
|
final Optional<String> versionedId = connectable.getVersionedComponentId();
|
||||||
|
if (!versionedId.isPresent()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>());
|
||||||
|
byVersionedId.add(connectable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) {
|
private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) {
|
||||||
final AffectedComponentEntity entity = new AffectedComponentEntity();
|
final AffectedComponentEntity entity = new AffectedComponentEntity();
|
||||||
|
@ -4023,6 +4124,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode, final NiFiUser user) {
|
||||||
|
final AffectedComponentEntity entity = new AffectedComponentEntity();
|
||||||
|
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
|
||||||
|
entity.setId(serviceNode.getIdentifier());
|
||||||
|
|
||||||
|
final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
|
||||||
|
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
|
||||||
|
entity.setPermissions(permissionsDto);
|
||||||
|
|
||||||
|
final AffectedComponentDTO dto = new AffectedComponentDTO();
|
||||||
|
dto.setId(serviceNode.getIdentifier());
|
||||||
|
dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
|
||||||
|
dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier());
|
||||||
|
dto.setState(serviceNode.getState().name());
|
||||||
|
|
||||||
|
entity.setComponent(dto);
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
|
private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
|
||||||
final AffectedComponentEntity entity = new AffectedComponentEntity();
|
final AffectedComponentEntity entity = new AffectedComponentEntity();
|
||||||
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
|
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.nifi.registry.flow.VersionedProcessGroup;
|
||||||
import org.apache.nifi.util.BundleUtils;
|
import org.apache.nifi.util.BundleUtils;
|
||||||
import org.apache.nifi.web.NiFiServiceFacade;
|
import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
import org.apache.nifi.web.ResourceNotFoundException;
|
import org.apache.nifi.web.ResourceNotFoundException;
|
||||||
|
import org.apache.nifi.web.ResumeFlowException;
|
||||||
import org.apache.nifi.web.Revision;
|
import org.apache.nifi.web.Revision;
|
||||||
import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
|
import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
|
||||||
import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
|
import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
|
||||||
|
@ -1128,6 +1129,11 @@ public class VersionsResource extends ApplicationResource {
|
||||||
idGenerationSeed, true, true);
|
idGenerationSeed, true, true);
|
||||||
|
|
||||||
vcur.markComplete(updatedVersionControlEntity);
|
vcur.markComplete(updatedVersionControlEntity);
|
||||||
|
} catch (final ResumeFlowException rfe) {
|
||||||
|
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
|
||||||
|
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
|
||||||
|
logger.error(rfe.getMessage(), rfe);
|
||||||
|
vcur.setFailureReason(rfe.getMessage());
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Failed to update flow to new version", e);
|
logger.error("Failed to update flow to new version", e);
|
||||||
vcur.setFailureReason("Failed to update flow to new version due to " + e);
|
vcur.setFailureReason("Failed to update flow to new version due to " + e);
|
||||||
|
@ -1301,6 +1307,11 @@ public class VersionsResource extends ApplicationResource {
|
||||||
idGenerationSeed, false, true);
|
idGenerationSeed, false, true);
|
||||||
|
|
||||||
vcur.markComplete(updatedVersionControlEntity);
|
vcur.markComplete(updatedVersionControlEntity);
|
||||||
|
} catch (final ResumeFlowException rfe) {
|
||||||
|
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
|
||||||
|
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
|
||||||
|
logger.error(rfe.getMessage(), rfe);
|
||||||
|
vcur.setFailureReason(rfe.getMessage());
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Failed to update flow to new version", e);
|
logger.error("Failed to update flow to new version", e);
|
||||||
vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
|
vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
|
||||||
|
@ -1333,13 +1344,15 @@ public class VersionsResource extends ApplicationResource {
|
||||||
private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
|
private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
|
||||||
final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity,
|
final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity,
|
||||||
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
|
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
|
||||||
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException {
|
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException, ResumeFlowException {
|
||||||
|
|
||||||
// Steps 6-7: Determine which components must be stopped and stop them.
|
// Steps 6-7: Determine which components must be stopped and stop them.
|
||||||
final Set<String> stoppableReferenceTypes = new HashSet<>();
|
final Set<String> stoppableReferenceTypes = new HashSet<>();
|
||||||
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
|
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
|
||||||
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
|
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
|
||||||
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT);
|
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT);
|
||||||
|
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
|
||||||
|
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
|
||||||
|
|
||||||
final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
|
final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
|
||||||
.filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
|
.filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
|
||||||
|
@ -1459,7 +1472,14 @@ public class VersionsResource extends ApplicationResource {
|
||||||
asyncRequest.setCancelCallback(enableServicesPause::cancel);
|
asyncRequest.setCancelCallback(enableServicesPause::cancel);
|
||||||
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
|
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
|
||||||
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
|
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
|
||||||
|
|
||||||
|
try {
|
||||||
componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
|
componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
|
||||||
|
} catch (final IllegalStateException ise) {
|
||||||
|
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||||
|
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
||||||
|
throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!asyncRequest.isCancelled()) {
|
if (!asyncRequest.isCancelled()) {
|
||||||
|
@ -1474,7 +1494,14 @@ public class VersionsResource extends ApplicationResource {
|
||||||
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||||
asyncRequest.setCancelCallback(startComponentsPause::cancel);
|
asyncRequest.setCancelCallback(startComponentsPause::cancel);
|
||||||
logger.info("Restarting {} Processors", componentsToStart.size());
|
logger.info("Restarting {} Processors", componentsToStart.size());
|
||||||
|
|
||||||
|
try {
|
||||||
componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
|
componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
|
||||||
|
} catch (final IllegalStateException ise) {
|
||||||
|
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||||
|
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
||||||
|
throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -179,6 +179,7 @@ import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
|
||||||
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
|
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
|
||||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||||
import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity;
|
import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity;
|
||||||
|
import org.apache.nifi.web.api.entity.PortEntity;
|
||||||
import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
|
import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
|
||||||
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
|
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
|
||||||
import org.apache.nifi.web.api.entity.ProcessorEntity;
|
import org.apache.nifi.web.api.entity.ProcessorEntity;
|
||||||
|
@ -1799,6 +1800,32 @@ public final class DtoFactory {
|
||||||
return component;
|
return component;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AffectedComponentEntity createAffectedComponentEntity(final PortEntity portEntity, final String referenceType) {
|
||||||
|
if (portEntity == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AffectedComponentEntity component = new AffectedComponentEntity();
|
||||||
|
component.setBulletins(portEntity.getBulletins());
|
||||||
|
component.setId(portEntity.getId());
|
||||||
|
component.setPermissions(portEntity.getPermissions());
|
||||||
|
component.setPosition(portEntity.getPosition());
|
||||||
|
component.setRevision(portEntity.getRevision());
|
||||||
|
component.setUri(portEntity.getUri());
|
||||||
|
|
||||||
|
final PortDTO portDto = portEntity.getComponent();
|
||||||
|
final AffectedComponentDTO componentDto = new AffectedComponentDTO();
|
||||||
|
componentDto.setId(portDto.getId());
|
||||||
|
componentDto.setName(portDto.getName());
|
||||||
|
componentDto.setProcessGroupId(portDto.getParentGroupId());
|
||||||
|
componentDto.setReferenceType(referenceType);
|
||||||
|
componentDto.setState(portDto.getState());
|
||||||
|
componentDto.setValidationErrors(portDto.getValidationErrors());
|
||||||
|
component.setComponent(componentDto);
|
||||||
|
|
||||||
|
return component;
|
||||||
|
}
|
||||||
|
|
||||||
public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) {
|
public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) {
|
||||||
if (serviceEntity == null) {
|
if (serviceEntity == null) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
|
||||||
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
|
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
|
||||||
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
|
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -74,11 +75,10 @@ public interface ProcessGroupDAO {
|
||||||
/**
|
/**
|
||||||
* Verifies the specified controller services can be modified
|
* Verifies the specified controller services can be modified
|
||||||
*
|
*
|
||||||
* @param groupId the ID of the process group
|
|
||||||
* @param state the desired state
|
* @param state the desired state
|
||||||
* @param serviceIds the ID's of the controller services
|
* @param serviceIds the ID's of the controller services
|
||||||
*/
|
*/
|
||||||
void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
|
void verifyActivateControllerServices(ControllerServiceState state, Collection<String> serviceIds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedules the components in the specified process group.
|
* Schedules the components in the specified process group.
|
||||||
|
@ -93,11 +93,10 @@ public interface ProcessGroupDAO {
|
||||||
/**
|
/**
|
||||||
* Enables or disables the controller services in the specified process group
|
* Enables or disables the controller services in the specified process group
|
||||||
*
|
*
|
||||||
* @param groupId the id of the group
|
|
||||||
* @param state the desired state
|
* @param state the desired state
|
||||||
* @param serviceIds the ID's of the services to enable or disable
|
* @param serviceIds the ID's of the services to enable or disable
|
||||||
*/
|
*/
|
||||||
Future<Void> activateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
|
Future<Void> activateControllerServices(ControllerServiceState state, Collection<String> serviceIds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the specified process group.
|
* Updates the specified process group.
|
||||||
|
|
|
@ -41,8 +41,10 @@ import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
|
||||||
import org.apache.nifi.web.api.entity.VariableEntity;
|
import org.apache.nifi.web.api.entity.VariableEntity;
|
||||||
import org.apache.nifi.web.dao.ProcessGroupDAO;
|
import org.apache.nifi.web.dao.ProcessGroupDAO;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -130,18 +132,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
|
public void verifyActivateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) {
|
||||||
final ProcessGroup group = locateProcessGroup(flowController, groupId);
|
final Set<ControllerServiceNode> serviceNodes = serviceIds.stream()
|
||||||
|
.map(flowController::getControllerServiceNode)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
group.findAllControllerServices().stream()
|
for (final ControllerServiceNode serviceNode : serviceNodes) {
|
||||||
.filter(service -> serviceIds.contains(service.getIdentifier()))
|
|
||||||
.forEach(service -> {
|
|
||||||
if (state == ControllerServiceState.ENABLED) {
|
if (state == ControllerServiceState.ENABLED) {
|
||||||
service.verifyCanEnable();
|
serviceNode.verifyCanEnable(serviceNodes);
|
||||||
} else {
|
} else {
|
||||||
service.verifyCanDisable();
|
serviceNode.verifyCanDisable(serviceNodes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -201,28 +203,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
|
public Future<Void> activateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) {
|
||||||
final ProcessGroup group = locateProcessGroup(flowController, groupId);
|
final List<ControllerServiceNode> serviceNodes = serviceIds.stream()
|
||||||
|
.map(flowController::getControllerServiceNode)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
|
if (state == ControllerServiceState.ENABLED) {
|
||||||
for (final String serviceId : serviceIds) {
|
return flowController.enableControllerServicesAsync(serviceNodes);
|
||||||
final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true);
|
|
||||||
if (serviceNode == null) {
|
|
||||||
throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ControllerServiceState.ENABLED.equals(state)) {
|
|
||||||
final CompletableFuture<Void> serviceFuture = flowController.enableControllerService(serviceNode);
|
|
||||||
future = CompletableFuture.allOf(future, serviceFuture);
|
|
||||||
} else {
|
} else {
|
||||||
final CompletableFuture<Void> serviceFuture = flowController.disableControllerService(serviceNode);
|
return flowController.disableControllerServicesAsync(serviceNodes);
|
||||||
future = CompletableFuture.allOf(future, serviceFuture);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return future;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProcessGroup updateProcessGroup(ProcessGroupDTO processGroupDTO) {
|
public ProcessGroup updateProcessGroup(ProcessGroupDTO processGroupDTO) {
|
||||||
final ProcessGroup group = locateProcessGroup(flowController, processGroupDTO.getId());
|
final ProcessGroup group = locateProcessGroup(flowController, processGroupDTO.getId());
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
|
||||||
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
|
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
|
||||||
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
|
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
|
||||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||||
|
import org.apache.nifi.web.api.entity.PortEntity;
|
||||||
import org.apache.nifi.web.api.entity.ProcessorEntity;
|
import org.apache.nifi.web.api.entity.ProcessorEntity;
|
||||||
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
|
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
|
||||||
|
|
||||||
|
@ -39,6 +40,14 @@ public class AffectedComponentUtils {
|
||||||
case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR:
|
case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR:
|
||||||
final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user);
|
final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user);
|
||||||
return dtoFactory.createAffectedComponentEntity(procEntity);
|
return dtoFactory.createAffectedComponentEntity(procEntity);
|
||||||
|
case AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT: {
|
||||||
|
final PortEntity portEntity = serviceFacade.getInputPort(componentEntity.getId(), user);
|
||||||
|
return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
|
||||||
|
}
|
||||||
|
case AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT: {
|
||||||
|
final PortEntity portEntity = serviceFacade.getOutputPort(componentEntity.getId(), user);
|
||||||
|
return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
|
||||||
|
}
|
||||||
case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE:
|
case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE:
|
||||||
final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user);
|
final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user);
|
||||||
return dtoFactory.createAffectedComponentEntity(serviceEntity);
|
return dtoFactory.createAffectedComponentEntity(serviceEntity);
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
package org.apache.nifi.web.util;
|
package org.apache.nifi.web.util;
|
||||||
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
|
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceState;
|
import org.apache.nifi.controller.service.ControllerServiceState;
|
||||||
import org.apache.nifi.web.NiFiServiceFacade;
|
import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
import org.apache.nifi.web.Revision;
|
import org.apache.nifi.web.Revision;
|
||||||
|
@ -33,6 +35,9 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -228,6 +233,46 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
||||||
waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user);
|
waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
||||||
|
final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
|
||||||
|
|
||||||
|
for (final ControllerServiceNode node : serviceNodeMap.values()) {
|
||||||
|
final List<ControllerServiceNode> branch = new ArrayList<>();
|
||||||
|
determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
|
||||||
|
orderedNodeLists.add(branch);
|
||||||
|
}
|
||||||
|
|
||||||
|
return orderedNodeLists;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void determineEnablingOrder(
|
||||||
|
final Map<String, ControllerServiceNode> serviceNodeMap,
|
||||||
|
final ControllerServiceNode contextNode,
|
||||||
|
final List<ControllerServiceNode> orderedNodes,
|
||||||
|
final Set<ControllerServiceNode> visited) {
|
||||||
|
if (visited.contains(contextNode)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) {
|
||||||
|
if (entry.getKey().getControllerServiceDefinition() != null) {
|
||||||
|
final String referencedServiceId = entry.getValue();
|
||||||
|
if (referencedServiceId != null) {
|
||||||
|
final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
|
||||||
|
if (!orderedNodes.contains(referencedNode)) {
|
||||||
|
visited.add(contextNode);
|
||||||
|
determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!orderedNodes.contains(contextNode)) {
|
||||||
|
orderedNodes.add(contextNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
|
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue