NIFI-4436: Bug fixes; ensure correct Exception types are thrown

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-12-11 15:36:56 -05:00 committed by Bryan Bende
parent 1266235c00
commit 181d6809c1
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
16 changed files with 246 additions and 78 deletions

View File

@ -48,6 +48,9 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr
private void mergeVersionControlInformation(ProcessGroupEntity targetGroup, ProcessGroupEntity toMerge) {
final ProcessGroupDTO targetGroupDto = targetGroup.getComponent();
final ProcessGroupDTO toMergeGroupDto = toMerge.getComponent();
if (targetGroupDto == null || toMergeGroupDto == null) {
return;
}
final VersionControlInformationDTO targetVersionControl = targetGroupDto.getVersionControlInformation();
final VersionControlInformationDTO toMergeVersionControl = toMergeGroupDto.getVersionControlInformation();

View File

@ -965,7 +965,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
/**
* Disconnects this Process Group from version control. If not currently under version control, this method does nothing.
*/
void disconnectVersionControl();
void disconnectVersionControl(boolean removeVersionedComponentIds);
/**
* Synchronizes the Process Group with the given Flow Registry, determining whether or not the local flow

View File

@ -212,6 +212,5 @@ public interface FlowRegistry {
* @throws IOException if unable to communicate with the Flow Registry
* @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
*/
// TODO: Do we still need this?
VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException;
}

View File

@ -234,6 +234,7 @@ import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
@ -2372,26 +2373,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* </p>
*
* @param group group
* @param templateContents contents
* @param snippetContents contents
*/
private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) {
private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO snippetContents) {
// validate the names of Input Ports
for (final PortDTO port : templateContents.getInputPorts()) {
for (final PortDTO port : snippetContents.getInputPorts()) {
if (group.getInputPortByName(port.getName()) != null) {
throw new IllegalStateException("One or more of the proposed Port names is not available in the process group");
}
}
// validate the names of Output Ports
for (final PortDTO port : templateContents.getOutputPorts()) {
for (final PortDTO port : snippetContents.getOutputPorts()) {
if (group.getOutputPortByName(port.getName()) != null) {
throw new IllegalStateException("One or more of the proposed Port names is not available in the process group");
}
}
verifyComponentTypesInSnippet(templateContents);
verifyComponentTypesInSnippet(snippetContents);
SnippetUtils.verifyNoVersionControlConflicts(snippetContents, group);
}
/**
* Recursively finds all ConnectionDTO's
*

View File

@ -112,6 +112,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
@ -2294,6 +2295,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
verifyContents(snippet);
verifyDestinationNotInSnippet(snippet, destination);
SnippetUtils.verifyNoVersionControlConflicts(snippet, this, destination);
if (!isDisconnected(snippet)) {
throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
@ -3087,13 +3089,15 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
public void disconnectVersionControl() {
public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
writeLock.lock();
try {
this.versionControlInfo.set(null);
// remove version component ids from each component (until another versioned PG is encountered)
applyVersionedComponentIds(this, id -> null);
if (removeVersionedComponentIds) {
// remove version component ids from each component (until another versioned PG is encountered)
applyVersionedComponentIds(this, id -> null);
}
} finally {
writeLock.unlock();
}
@ -3278,7 +3282,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<String> knownVariables = getKnownVariableNames();
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
throw new IllegalStateException("Failed to update flow", pie);
} finally {
writeLock.unlock();
}
@ -3366,7 +3370,9 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVariables(updatedVariableMap);
final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
if (remoteCoordinates == null) {
group.disconnectVersionControl(false);
} else {
final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
final String bucketId = remoteCoordinates.getBucketId();
final String flowId = remoteCoordinates.getFlowId();
@ -3681,8 +3687,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
// TODO: I think we can get rid of all of those LinkedHashSet's now in the VersionedProcessGroup because
/// the UUID is properly keyed off of the ID of the component in the VersionedProcessGroup.
long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
UUID uuid;
@ -3733,7 +3737,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
return flowController.createPrioritizer(prioritizerName);
} catch (final Exception e) {
throw new RuntimeException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
}
})
.collect(Collectors.toList());
@ -4016,7 +4020,14 @@ public final class StandardProcessGroup implements ProcessGroup {
// The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this
// to the instance ID of the Controller Service.
final String serviceVersionedComponentId = entry.getValue();
final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
if (instanceId == null) {
// We didn't find the instance ID based on the Versioned Component ID. So we want to just
// leave the value set to whatever it currently is, if it's currently set.
final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build();
instanceId = currentProperties.get(propertyDescriptor);
}
value = instanceId == null ? serviceVersionedComponentId : instanceId;
} else {
value = entry.getValue();
@ -4169,15 +4180,22 @@ public final class StandardProcessGroup implements ProcessGroup {
+ " reverted to its original form before changing the version.");
}
}
verifyNoDescendantsWithLocalModifications("be updated");
}
final VersionedProcessGroup flowContents = updatedFlow.getFlowContents();
if (verifyConnectionRemoval) {
// Determine which Connections have been removed.
final Map<String, Connection> removedConnectionByVersionedId = new HashMap<>();
// Populate the 'removedConnectionByVersionId' map with all Connections. We key off of the connection's VersionedComponentID
// if it is populated. Otherwise, we key off of its actual ID. We do this because it allows us to then remove from this Map
// any connection that does exist in the proposed flow. This results in us having a Map whose values are those Connections
// that were removed. We can then check for any connections that have data in them. If any Connection is to be removed but
// has data, then we should throw an IllegalStateException.
findAllConnections().stream()
.filter(conn -> conn.getVersionedComponentId().isPresent())
.forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().get(), conn));
.forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().orElse(conn.getIdentifier()), conn));
final Set<String> proposedFlowConnectionIds = new HashSet<>();
findAllConnectionIds(flowContents, proposedFlowConnectionIds);
@ -4252,8 +4270,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (!proposedProcessGroups.containsKey(versionedId)) {
// Process Group was removed.
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup
+ " that exists locally has one or more Templates, and the proposed flow does not contain this Process Group. "
+ "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to chnage the version of the flow.");
+ " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
+ "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to change the version of the flow.");
}
}
@ -4430,6 +4448,12 @@ public final class StandardProcessGroup implements ProcessGroup {
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
}
if (flowState == VersionedFlowState.SYNC_FAILURE) {
throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ "is not synchronized with the Flow Registry. Each descendant Process Group must first be synchronized with the Flow Registry before this action can be "
+ "performed on the parent Process Group. NiFi will continue to attempt to communicate with the Flow Registry periodically in the background.");
}
}
}
}

View File

@ -43,6 +43,13 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
@Override
public void addFlowRegistry(final FlowRegistry registry) {
final boolean duplicateName = registryById.values().stream()
.anyMatch(reg -> reg.getName().equals(registry.getName()));
if (duplicateName) {
throw new IllegalStateException("Cannot add Flow Registry because a Flow Registry already exists with the name " + registry.getName());
}
final FlowRegistry existing = registryById.putIfAbsent(registry.getIdentifier(), registry);
if (existing != null) {
throw new IllegalStateException("Cannot add Flow Registry " + registry + " because a Flow Registry already exists with the ID " + registry.getIdentifier());
@ -58,7 +65,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
throw new IllegalStateException("Failed to create Flow Registry for URI " + registryUrl
+ " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+ "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
}
@ -68,7 +75,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
} else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
throw new IllegalStateException("Failed to create Flow Registry for URI " + registryUrl
+ " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+ "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
}

View File

@ -16,11 +16,15 @@
*/
package org.apache.nifi.util;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import java.util.ArrayList;
import java.util.Collection;
@ -307,4 +311,99 @@ public final class SnippetUtils {
connection.setBends(bends);
}
}
public static void verifyNoVersionControlConflicts(final Snippet snippet, final ProcessGroup parentGroup, final ProcessGroup destination) {
if (snippet == null) {
return;
}
if (snippet.getProcessGroups() == null) {
return;
}
final List<VersionControlInformation> vcis = new ArrayList<>();
for (final String groupId : snippet.getProcessGroups().keySet()) {
final ProcessGroup group = parentGroup.getProcessGroup(groupId);
if (group != null) {
findAllVersionControlInfo(group, vcis);
}
}
verifyNoDuplicateVersionControlInfo(destination, vcis);
}
public static void verifyNoVersionControlConflicts(final FlowSnippetDTO snippetContents, final ProcessGroup destination) {
final List<VersionControlInformationDTO> vcis = new ArrayList<>();
for (final ProcessGroupDTO childGroup : snippetContents.getProcessGroups()) {
findAllVersionControlInfo(childGroup, vcis);
}
verifyNoDuplicateVersionControlInfoDtos(destination, vcis);
}
private static void verifyNoDuplicateVersionControlInfoDtos(final ProcessGroup group, final Collection<VersionControlInformationDTO> snippetVcis) {
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
for (final VersionControlInformationDTO snippetVci : snippetVcis) {
if (vci.getBucketIdentifier().equals(snippetVci.getBucketId()) && vci.getFlowIdentifier().equals(snippetVci.getFlowId())) {
throw new IllegalArgumentException("Cannot place the given Process Group into the desired destination because the destination group or one of its ancestor groups is "
+ "under Version Control and one of the selected Process Groups is also under Version Control with the same Flow. A Process Group that is under Version Control "
+ "cannot contain a child Process Group that points to the same Versioned Flow.");
}
}
}
final ProcessGroup parent = group.getParent();
if (parent != null) {
verifyNoDuplicateVersionControlInfoDtos(parent, snippetVcis);
}
}
private static void verifyNoDuplicateVersionControlInfo(final ProcessGroup group, final Collection<VersionControlInformation> snippetVcis) {
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
for (final VersionControlInformation snippetVci : snippetVcis) {
if (vci.getBucketIdentifier().equals(snippetVci.getBucketIdentifier()) && vci.getFlowIdentifier().equals(snippetVci.getFlowIdentifier())) {
throw new IllegalArgumentException("Cannot place the given Process Group into the desired destination because the destination group or one of its ancestor groups is "
+ "under Version Control and one of the selected Process Groups is also under Version Control with the same Flow. A Process Group that is under Version Control "
+ "cannot contain a child Process Group that points to the same Versioned Flow.");
}
}
}
final ProcessGroup parent = group.getParent();
if (parent != null) {
verifyNoDuplicateVersionControlInfo(parent, snippetVcis);
}
}
private static void findAllVersionControlInfo(final ProcessGroupDTO dto, final List<VersionControlInformationDTO> found) {
final VersionControlInformationDTO vci = dto.getVersionControlInformation();
if (vci != null) {
found.add(vci);
}
final FlowSnippetDTO contents = dto.getContents();
if (contents != null) {
for (final ProcessGroupDTO child : contents.getProcessGroups()) {
findAllVersionControlInfo(child, found);
}
}
}
private static void findAllVersionControlInfo(final ProcessGroup group, final List<VersionControlInformation> found) {
if (group == null) {
return;
}
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
found.add(vci);
}
for (final ProcessGroup childGroup : group.findAllProcessGroups()) {
findAllVersionControlInfo(childGroup, found);
}
}
}

View File

@ -127,6 +127,8 @@ class StandardFlowSynchronizerSpec extends Specification {
}
}
}
_ * processGroup.findAllRemoteProcessGroups() >> []
positionableMocksById.put(pgId, processGroup)
return processGroup
}

View File

@ -667,7 +667,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public void disconnectVersionControl() {
public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
this.versionControlInfo = null;
}

View File

@ -23,7 +23,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
@ -117,7 +116,6 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -423,11 +421,12 @@ public interface NiFiServiceFacade {
* with the given id
*
* @param versionControlInfo the information about the versioned flow
* @param versionedProcessGroup the contents to be imported
* @param groupId the ID of the Process Group where the flow should be instantiated
*
* @throws IllegalStateException if the flow cannot be imported into the specified group
*/
void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId);
void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup versionedProcessGroup, String groupId);
/**
* Creates a new Template based off the specified snippet.
@ -1295,7 +1294,7 @@ public interface NiFiServiceFacade {
* was last synchronized with the Flow Registry
* @throws IllegalStateException if the Process Group with the given ID is not under version control
*/
FlowComparisonEntity getLocalModifications(String processGroupId) throws IOException, NiFiRegistryException;
FlowComparisonEntity getLocalModifications(String processGroupId);
/**
* Returns the Version Control information for the Process Group with the given ID
@ -1314,9 +1313,9 @@ public interface NiFiServiceFacade {
* @param flow the flow to add to the registry
* @return a VersionedFlow that is fully populated, including identifiers
*
* @throws IOException if unable to communicate with the Flow Registry
* @throws NiFiCoreException if unable to register flow
*/
VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, NiFiRegistryException;
VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow);
/**
* Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry
@ -1337,7 +1336,7 @@ public interface NiFiServiceFacade {
* @param flowId the ID of the flow
* @return the VersionedFlow that was deleted
*/
VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId) throws IOException, NiFiRegistryException;
VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId);
/**
* Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
@ -1349,10 +1348,9 @@ public interface NiFiServiceFacade {
* @param expectedVersion the version to save the flow as
* @return the snapshot that represents what was stored in the registry
*
* @throws IOException if unable to communicate with the Flow Registry
* @throws NiFiCoreException if unable to register the snapshot with the flow registry
*/
VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion)
throws IOException, NiFiRegistryException;
VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion);
/**
* Updates the Version Control Information on the Process Group with the given ID
@ -1386,7 +1384,7 @@ public interface NiFiServiceFacade {
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
*/
VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException;
VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows);
/**
* Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
@ -1406,7 +1404,7 @@ public interface NiFiServiceFacade {
* @param user the user making the request
* @return the set of all components that would be affected by updating the Process Group
*/
Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user) throws IOException;
Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user);
/**
* Verifies that the Process Group with the given identifier can be updated to the proposed flow

View File

@ -95,6 +95,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
@ -1866,20 +1867,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) {
public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
verifyImportProcessGroup(versionControlInfo, group);
verifyImportProcessGroup(versionControlInfo, contents, group);
}
private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) {
private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) {
if (group == null) {
return;
}
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier())
&& Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
// Note that we do not compare the Registry ID here because there could be two registry clients
// that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance)..
if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
&& Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
@ -1887,7 +1889,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
}
verifyImportProcessGroup(vciDto, group.getParent());
final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups();
if (childGroups != null) {
for (final VersionedProcessGroup childGroup : childGroups) {
final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates();
if (childCoordinates != null) {
final VersionControlInformationDTO childVci = new VersionControlInformationDTO();
childVci.setBucketId(childCoordinates.getBucketId());
childVci.setFlowId(childCoordinates.getFlowId());
verifyImportProcessGroup(childVci, childGroup, group);
}
}
}
verifyImportProcessGroup(vciDto, contents, group.getParent());
}
@Override
@ -3447,8 +3462,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies()));
entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem()));
entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents()));
// TODO - update to be user specific
entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers()));
return entity;
@ -3722,13 +3735,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
}
return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
try {
return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
} catch (final IOException | NiFiRegistryException e) {
throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e);
}
}
@Override
@ -3752,7 +3769,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public FlowComparisonEntity getLocalModifications(final String processGroupId) throws IOException, NiFiRegistryException {
public FlowComparisonEntity getLocalModifications(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
if (versionControlInfo == null) {
@ -3765,11 +3782,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
+ " but cannot find a Flow Registry with that identifier");
}
final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser());
final VersionedFlowSnapshot versionedFlowSnapshot;
try {
versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser());
} catch (final IOException | NiFiRegistryException e) {
throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
}
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
@ -3802,13 +3824,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException {
public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
}
return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
try {
return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
} catch (final IOException | NiFiRegistryException e) {
throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
}
}
private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
@ -3822,13 +3848,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) throws IOException, NiFiRegistryException {
final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
}
return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
try {
return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
} catch (final IOException | NiFiRegistryException e) {
throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
}
}
@Override
@ -3881,7 +3911,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException {
public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
@ -4057,7 +4087,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException {
public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
@ -4066,7 +4096,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
} catch (final NiFiRegistryException e) {
} catch (final NiFiRegistryException | IOException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
}

View File

@ -1644,10 +1644,6 @@ public class ProcessGroupResource extends ApplicationResource {
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
if (versionControlInfo != null) {
serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId);
}
if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
@ -1670,6 +1666,11 @@ public class ProcessGroupResource extends ApplicationResource {
requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot);
}
if (versionControlInfo != null) {
final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
serviceFacade.verifyImportProcessGroup(versionControlInfo, flowSnapshot.getFlowContents(), groupId);
}
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessGroupEntity);

View File

@ -1110,7 +1110,7 @@ public class VersionsResource extends ApplicationResource {
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Affected Processors");
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
@ -1275,7 +1275,7 @@ public class VersionsResource extends ApplicationResource {
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Affected Processors");
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {

View File

@ -172,21 +172,23 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
}
}
controllerService.getProcessGroup().onComponentModified();
// For any component that references this Controller Service, find the component's Process Group
// and notify the Process Group that a component has been modified. This way, we know to re-calculate
// whether or not the Process Group has local modifications.
final ProcessGroup group = controllerService.getProcessGroup();
controllerService.getReferences().getReferencingComponents().stream()
.map(ConfiguredComponent::getProcessGroupIdentifier)
.filter(id -> !id.equals(group.getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = group.findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
if (group != null) {
group.onComponentModified();
// For any component that references this Controller Service, find the component's Process Group
// and notify the Process Group that a component has been modified. This way, we know to re-calculate
// whether or not the Process Group has local modifications.
controllerService.getReferences().getReferencingComponents().stream()
.map(ConfiguredComponent::getProcessGroupIdentifier)
.filter(id -> !id.equals(group.getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = group.findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
}
return controllerService;
}

View File

@ -274,7 +274,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup disconnectVersionControl(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.disconnectVersionControl();
group.disconnectVersionControl(true);
return group;
}

View File

@ -90,7 +90,6 @@ public final class SnippetUtils {
private DtoFactory dtoFactory;
private AccessPolicyDAO accessPolicyDAO;
/**
* Populates the specified snippet and returns the details.
*