NIFI-4436: Bug fix to ensure that RPG's ports are not removed until after connections are established to the ports; ensure that if a registry's name is changed that it is updated immediately in VersionControlInformation objects

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-12-01 16:31:22 -05:00 committed by Bryan Bende
parent 49aad2c3a8
commit 014c542f48
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
12 changed files with 159 additions and 60 deletions

View File

@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
void initialize();
@Override
String getIdentifier();

View File

@ -1779,6 +1779,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
instantiateSnippet(group, dto, true);
group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
}
private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {

View File

@ -360,6 +360,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
}
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
// If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need
// to ensure that we also add those to the appropriate Process Groups, so that we don't lose them.
final Document existingFlowConfiguration = parseFlowBytes(existingFlow);

View File

@ -2963,6 +2963,13 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.isCurrent(),
versionControlInformation.getStatus()) {
@Override
public String getRegistryName() {
final String registryId = versionControlInformation.getRegistryIdentifier();
final FlowRegistry registry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
return registry == null ? registryId : registry.getName();
}
@Override
public boolean isModified() {
boolean updated = false;
@ -3220,7 +3227,7 @@ public final class StandardProcessGroup implements ProcessGroup {
updated = flowStatus.compareAndSet(status, updatedStatus);
}
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry");
final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage());
setSyncFailedState(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
@ -3451,6 +3458,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, this);
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip);
@ -3759,7 +3767,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Connectable destination = getConnectable(destinationGroup, proposed.getDestination());
if (destination == null) {
throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier()
throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId()
+ " but no component could be found in the Process Group with a corresponding identifier");
}

View File

@ -115,40 +115,61 @@ public class RestBasedFlowRegistry implements FlowRegistry {
return (user == null || user.isAnonymous()) ? null : user.getIdentity();
}
private BucketClient getBucketClient(final NiFiUser user) {
final String identity = getIdentity(user);
final NiFiRegistryClient registryClient = getRegistryClient();
final BucketClient bucketClient = identity == null ? registryClient.getBucketClient() : registryClient.getBucketClient(identity);
return bucketClient;
}
private FlowSnapshotClient getFlowSnapshotClient(final NiFiUser user) {
final String identity = getIdentity(user);
final NiFiRegistryClient registryClient = getRegistryClient();
final FlowSnapshotClient snapshotClient = identity == null ? registryClient.getFlowSnapshotClient() : registryClient.getFlowSnapshotClient(identity);
return snapshotClient;
}
private FlowClient getFlowClient(final NiFiUser user) {
final String identity = getIdentity(user);
final NiFiRegistryClient registryClient = getRegistryClient();
final FlowClient flowClient = identity == null ? registryClient.getFlowClient() : registryClient.getFlowClient(identity);
return flowClient;
}
@Override
public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
final BucketClient bucketClient = getBucketClient(user);
return new HashSet<>(bucketClient.getAll());
}
@Override
public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
final BucketClient bucketClient = getBucketClient(user);
return bucketClient.get(bucketId);
}
@Override
public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
final FlowClient flowClient = getFlowClient(user);
return new HashSet<>(flowClient.getByBucket(bucketId));
}
@Override
public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
}
@Override
public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
final FlowClient flowClient = getFlowClient(user);
return flowClient.create(flow);
}
@Override
public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
final FlowClient flowClient = getFlowClient(user);
return flowClient.delete(bucketId, flowId);
}
@ -156,7 +177,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setFlowContents(snapshot);
@ -174,13 +195,14 @@ public class RestBasedFlowRegistry implements FlowRegistry {
@Override
public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount();
return (int) getFlowClient(user).get(bucketId, flowId).getVersionCount();
}
@Override
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user)
throws IOException, NiFiRegistryException {
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
if (fetchRemoteFlows) {
@ -241,7 +263,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
@Override
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
final FlowClient flowClient = getFlowClient(user);
return flowClient.get(bucketId, flowId);
}

View File

@ -16,43 +16,8 @@
*/
package org.apache.nifi.remote;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@ -82,7 +47,42 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
@ -104,6 +104,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
private final long remoteContentsCacheExpiration;
private volatile boolean initialized = false;
private final AtomicReference<String> name = new AtomicReference<>();
private final AtomicReference<Position> position = new AtomicReference<>(new Position(0D, 0D));
@ -179,7 +180,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS);
}
@Override
public void initialize() {
if (initialized) {
return;
}
initialized = true;
backgroundThreadExecutor.submit(() -> {
try {
refreshFlowContents();
@ -820,6 +830,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void refreshFlowContents() throws CommunicationsException {
if (!initialized) {
return;
}
try {
// perform the request
final ControllerDTO dto;
@ -1153,6 +1167,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void run() {
if (!initialized) {
return;
}
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
try {
final ControllerDTO dto = apiClient.getController(targetUris);

View File

@ -417,6 +417,17 @@ public interface NiFiServiceFacade {
*/
void verifyComponentTypes(VersionedProcessGroup versionedGroup);
/**
* Verifies that the flow identified by the given Version Control Information can be imported into the Process Group
* with the given id
*
* @param versionControlInfo the information about the versioned flow
* @param groupId the ID of the Process Group where the flow should be instantiated
*
* @throws IllegalStateException if the flow cannot be imported into the specified group
*/
void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId);
/**
* Creates a new Template based off the specified snippet.
*

View File

@ -1862,6 +1862,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.verifyComponentTypes(versionedGroup);
}
@Override
public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
verifyImportProcessGroup(versionControlInfo, group);
}
private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) {
if (group == null) {
return;
}
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier())
&& Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
&& Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
+ "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
}
}
verifyImportProcessGroup(vciDto, group.getParent());
}
@Override
public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
// get the specified snippet

View File

@ -1641,6 +1641,10 @@ public class ProcessGroupResource extends ApplicationResource {
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
if (versionControlInfo != null) {
serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId);
}
if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
@ -1686,10 +1690,6 @@ public class ProcessGroupResource extends ApplicationResource {
}
},
() -> {
final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot != null) {
serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
}
},
processGroupEntity -> {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();

View File

@ -424,15 +424,24 @@ public class VersionsResource extends ApplicationResource {
if (versionedFlowDto == null) {
throw new IllegalArgumentException("Version Control Information must be supplied.");
}
if (versionedFlowDto.getBucketId() == null) {
if (StringUtils.isEmpty(versionedFlowDto.getBucketId())) {
throw new IllegalArgumentException("The Bucket ID must be supplied.");
}
if (versionedFlowDto.getFlowName() == null && versionedFlowDto.getFlowId() == null) {
if (StringUtils.isEmpty(versionedFlowDto.getFlowName()) && StringUtils.isEmpty(versionedFlowDto.getFlowId())) {
throw new IllegalArgumentException("The Flow Name or Flow ID must be supplied.");
}
if (versionedFlowDto.getRegistryId() == null) {
if (versionedFlowDto.getFlowName().length() > 1000) {
throw new IllegalArgumentException("The Flow Name cannot exceed 1,000 characters");
}
if (StringUtils.isEmpty(versionedFlowDto.getRegistryId())) {
throw new IllegalArgumentException("The Registry ID must be supplied.");
}
if (versionedFlowDto.getDescription() != null && versionedFlowDto.getDescription().length() > 65535) {
throw new IllegalArgumentException("Flow Description cannot exceed 65,535 characters");
}
if (versionedFlowDto.getComments() != null && versionedFlowDto.getComments().length() > 65535) {
throw new IllegalArgumentException("Comments cannot exceed 65,535 characters");
}
// ensure we're not attempting to version the root group
final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowController.ROOT_GROUP_ID_ALIAS);

View File

@ -1792,7 +1792,7 @@ public final class DtoFactory {
componentDto.setId(processorDto.getId());
componentDto.setName(processorDto.getName());
componentDto.setProcessGroupId(processorDto.getParentGroupId());
componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
componentDto.setState(processorDto.getState());
componentDto.setValidationErrors(processorDto.getValidationErrors());
component.setComponent(componentDto);

View File

@ -85,6 +85,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
// create the remote process group
RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
remoteProcessGroup.initialize();
// set other properties
updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);