From f6cc5b6cdca86b3f535754f7ec4d91ca61f2e35b Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sat, 4 Nov 2017 14:19:49 -0400 Subject: [PATCH] NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated authorizations required for version control endpoints - Add state and percent complete fields ot VersionedFlowUpdateRequestDTO - If a variable exists in a parent process group, do not include it in imported/updated process group when interacting with flow registry - Code cleanup, documentation; bug fixes - Ensure that we are passing NiFiUser to the flow registry client when appropriate - Updated to work against new version of flow registry client; deleted file-based flow registry client Signed-off-by: Matt Gilman --- .../nifi/registry/flow/FlowRegistry.java | 54 +- .../flow/VersionControlInformation.java | 14 +- .../nifi/groups/StandardProcessGroup.java | 53 +- .../registry/flow/FileBasedFlowRegistry.java | 509 ------------------ .../registry/flow/RestBasedFlowRegistry.java | 64 ++- .../flow/StandardFlowRegistryClient.java | 14 +- .../StandardVersionControlInformation.java | 23 +- .../apache/nifi/web/NiFiServiceFacade.java | 10 + .../nifi/web/StandardNiFiServiceFacade.java | 51 +- .../nifi/web/api/ProcessGroupResource.java | 12 +- .../apache/nifi/web/api/VersionsResource.java | 361 +++++++------ .../apache/nifi/web/api/dto/DtoFactory.java | 57 +- 12 files changed, 424 insertions(+), 798 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 10db9cf3b7..76f96f25db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -83,14 +83,6 @@ public interface FlowRegistry { */ Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException; - /** - * Gets the bucket with the given ID - * - * @param bucketId the id of the bucket - * @return the bucket with the given ID - */ - Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException; - /** * Retrieves the set of all Versioned Flows for the specified bucket * @@ -123,7 +115,15 @@ public interface FlowRegistry { * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null * @throws NiFiRegistryException if the bucket id does not exist */ - VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, NiFiRegistryException; + VersionedFlow registerVersionedFlow(VersionedFlow flow, NiFiUser user) throws IOException, NiFiRegistryException; + + /** + * Deletes the specified flow from the Flow Registry + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + */ + VersionedFlow deleteVersionedFlow(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException; /** * Adds the given snapshot to the Flow Registry for the given flow @@ -138,7 +138,8 @@ public interface FlowRegistry { * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null * @throws NiFiRegistryException if the flow does not exist */ - VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion) throws IOException, NiFiRegistryException; + VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion, NiFiUser user) + throws IOException, NiFiRegistryException; /** * Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow @@ -150,7 +151,23 @@ public interface FlowRegistry { * @throws IOException if unable to communicate with the Flow Registry * @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID */ - int getLatestVersion(String bucketId, String flowId) throws IOException, NiFiRegistryException; + int getLatestVersion(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException; + + /** + * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @param version the version to retrieve + * @return the contents of the Flow from the Flow Registry + * + * @throws IOException if unable to communicate with the Flow Registry + * @throws NiFiRegistryException if unable to find the contents of the flow due to the bucket or flow not existing, + * or the specified version of the flow not existing + * @throws NullPointerException if any of the arguments is not specified + * @throws IllegalArgumentException if the given version is less than 1 + */ + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException; /** * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry @@ -166,7 +183,6 @@ public interface FlowRegistry { * @throws NullPointerException if any of the arguments is not specified * @throws IllegalArgumentException if the given version is less than 1 */ - // TODO: Need to pass in user VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException; /** @@ -179,6 +195,18 @@ public interface FlowRegistry { * @throws IOException if unable to communicate with the Flow Registry * @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID */ - // TODO: Need to pass in user + VersionedFlow getVersionedFlow(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException; + + /** + * Retrieves a VersionedFlow by bucket id and flow id + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @return the VersionedFlow for the given bucket and flow ID's + * + * @throws IOException if unable to communicate with the Flow Registry + * @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID + */ + // TODO: Do we still need this? VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java index 67c3635ef2..b54a1c99a3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -17,8 +17,6 @@ package org.apache.nifi.registry.flow; -import java.util.Optional; - /** *

* Provides a mechanism for conveying which Flow Registry a flow is stored in, and @@ -69,18 +67,14 @@ public interface VersionControlInformation { /** * @return true if the flow has been modified since the last time that it was updated from the Flow Registry or saved - * to the Flow Registry; false if the flow is in sync with the Flow Registry. An empty optional will be returned - * if it is not yet known whether or not the flow has been modified (for example, on startup, when the flow has not yet been - * fetched from the Flow Registry) + * to the Flow Registry; false if the flow is in sync with the Flow Registry. */ - Optional getModified(); + boolean isModified(); /** - * @return true if this version of the flow is the most recent version of the flow available in the Flow Registry. - * An empty optional will be returned if it is not yet known whether or not the flow has been modified (for example, on startup, - * when the flow has not yet been fetched from the Flow Registry) + * @return true if this version of the flow is the most recent version of the flow available in the Flow Registry, false otherwise. */ - Optional getCurrent(); + boolean isCurrent(); /** * @return the snapshot of the flow that was synchronized with the Flow Registry diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 2783e96207..d1aa4e2f86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -2822,17 +2822,17 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getFlowIdentifier(), versionControlInformation.getVersion(), versionControlInformation.getFlowSnapshot(), - versionControlInformation.getModified().orElse(null), - versionControlInformation.getCurrent().orElse(null)) { + versionControlInformation.isModified(), + versionControlInformation.isCurrent()) { @Override - public Optional getModified() { + public boolean isModified() { final Set differences = StandardProcessGroup.this.getModifications(); if (differences == null) { - return Optional.ofNullable(null); + return false; } - return Optional.of(!differences.isEmpty()); + return !differences.isEmpty(); } }; @@ -2938,7 +2938,6 @@ public final class StandardProcessGroup implements ProcessGroup { try { final VersionedFlow versionedFlow = flowRegistry.getVersionedFlow(vci.getBucketIdentifier(), vci.getFlowIdentifier()); final int latestVersion = (int) versionedFlow.getVersionCount(); - vci.setBucketName(versionedFlow.getBucketName()); vci.setFlowName(versionedFlow.getName()); vci.setFlowDescription(versionedFlow.getDescription()); @@ -2986,7 +2985,8 @@ public final class StandardProcessGroup implements ProcessGroup { LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine); } - updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings); + final Set knownVariables = getKnownVariableNames(); + updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables); } catch (final ProcessorInstantiationException pie) { throw new RuntimeException(pie); } finally { @@ -2994,9 +2994,26 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private Set getKnownVariableNames() { + final Set variableNames = new HashSet<>(); + populateKnownVariableNames(this, variableNames); + return variableNames; + } + + private void populateKnownVariableNames(final ProcessGroup group, final Set knownVariables) { + group.getVariableRegistry().getVariableMap().keySet().stream() + .map(VariableDescriptor::getName) + .forEach(knownVariables::add); + + final ProcessGroup parent = group.getParent(); + if (parent != null) { + populateKnownVariableNames(parent, knownVariables); + } + } + private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed, - final Set updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException { + final Set updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set variablesToSkip) throws ProcessorInstantiationException { group.setComments(proposed.getComments()); @@ -3027,7 +3044,7 @@ public final class StandardProcessGroup implements ProcessGroup { // If any new variables exist in the proposed flow, add those to the variable registry. for (final Map.Entry entry : proposed.getVariables().entrySet()) { - if (!existingVariableNames.contains(entry.getKey())) { + if (!existingVariableNames.contains(entry.getKey()) && !variablesToSkip.contains(entry.getKey())) { updatedVariableMap.put(entry.getKey(), entry.getValue()); } } @@ -3068,10 +3085,10 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); if (childGroup == null) { - final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed); + final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); LOG.info("Added {} to {}", added, this); } else { - updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName); + updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip); LOG.info("Updated {}", childGroup); } @@ -3345,11 +3362,12 @@ public final class StandardProcessGroup implements ProcessGroup { } - private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException { + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set variablesToSkip) + throws ProcessorInstantiationException { final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); group.setVersionedComponentId(proposed.getIdentifier()); group.setParent(destination); - updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true); + updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip); destination.addProcessGroup(group); return group; } @@ -3771,16 +3789,11 @@ public final class StandardProcessGroup implements ProcessGroup { } if (verifyNotDirty) { - final Optional modifiedOption = versionControlInfo.getModified(); - if (!modifiedOption.isPresent()) { - throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow " - + "has not yet been synchronized with the Flow Registry. The Process Group must be" - + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later"); - } + final boolean modified = versionControlInfo.isModified(); final Set modifications = getModifications(); - if (Boolean.TRUE.equals(modifiedOption.get())) { + if (modified) { final String changes = modifications.stream() .map(FlowDifference::toString) .collect(Collectors.joining("\n")); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java deleted file mode 100644 index 9b3ba946e6..0000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java +++ /dev/null @@ -1,509 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.registry.flow; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; - -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.registry.bucket.Bucket; -import org.apache.nifi.registry.client.NiFiRegistryException; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * A simple file-based implementation of a Flow Registry Client. Rather than interacting - * with an actual Flow Registry, this implementation simply reads flows from disk and writes - * them to disk. It is not meant for any production use but is available for testing purposes. - */ -public class FileBasedFlowRegistry implements FlowRegistry { - private final File directory; - private final Map> flowNamesByBucket = new HashMap<>(); - private final JsonFactory jsonFactory = new JsonFactory(); - private final String id; - private volatile String name = "Local Registry"; - private volatile String url = "file:" + (new File("..").getAbsolutePath()); - private volatile String description = "Default file-based Flow Registry"; - - public FileBasedFlowRegistry(final String id, final String url) throws IOException { - final URI uri = URI.create(url); - if (!uri.getScheme().equalsIgnoreCase("file")) { - throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'"); - } - - this.directory = new File(URI.create(url).getPath()); - - if (!directory.exists() && !directory.mkdirs()) { - throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry"); - } - - this.id = id; - this.url = url; - recoverBuckets(); - } - - private void recoverBuckets() throws IOException { - final File[] bucketDirs = directory.listFiles(); - if (bucketDirs == null) { - throw new IOException("Could not get listing of directory " + directory); - } - - for (final File bucketDir : bucketDirs) { - final File[] flowDirs = bucketDir.listFiles(); - if (flowDirs == null) { - throw new IOException("Could not get listing of directory " + bucketDir); - } - - final Set flowNames = new HashSet<>(); - for (final File flowDir : flowDirs) { - final File propsFile = new File(flowDir, "flow.properties"); - if (!propsFile.exists()) { - continue; - } - - final Properties properties = new Properties(); - try (final InputStream in = new FileInputStream(propsFile)) { - properties.load(in); - } - - final String flowName = properties.getProperty("name"); - if (flowName == null) { - continue; - } - - flowNames.add(flowName); - } - - if (!flowNames.isEmpty()) { - flowNamesByBucket.put(bucketDir.getName(), flowNames); - } - } - } - - @Override - public String getURL() { - return url; - } - - @Override - public String getName() { - return name; - } - - @Override - public Set getBuckets(NiFiUser user) throws IOException { - final Set buckets = new HashSet<>(); - - final File[] bucketDirs = directory.listFiles(); - if (bucketDirs == null) { - throw new IOException("Could not get listing of directory " + directory); - } - - for (final File bucketDirectory : bucketDirs) { - final String bucketIdentifier = bucketDirectory.getName(); - final long creation = bucketDirectory.lastModified(); - - final Bucket bucket = new Bucket(); - bucket.setIdentifier(bucketIdentifier); - bucket.setName("Bucket '" + bucketIdentifier + "'"); - bucket.setCreatedTimestamp(creation); - - final Set versionedFlows = new HashSet<>(); - final File[] flowDirs = bucketDirectory.listFiles(); - if (flowDirs != null) { - for (final File flowDir : flowDirs) { - final String flowIdentifier = flowDir.getName(); - try { - final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier); - versionedFlows.add(versionedFlow); - } catch (NiFiRegistryException e) { - continue; - } - } - } - - bucket.setVersionedFlows(versionedFlows); - - buckets.add(bucket); - } - - return buckets; - } - - @Override - public Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException { - return getBucket(bucketId, null); - } - - @Override - public Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException { - return getBuckets(user).stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null); - } - - @Override - public Set getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final Bucket bucket = getBuckets(user).stream().filter(b -> bucketId.equals(b.getIdentifier())).findFirst().orElse(null); - if (bucket == null) { - return Collections.emptySet(); - } - - return bucket.getVersionedFlows(); - } - - @Override - public Set getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final VersionedFlow flow = getFlows(bucketId, user).stream().filter(f -> flowId.equals(f.getIdentifier())).findFirst().orElse(null); - if (flow == null) { - return Collections.emptySet(); - } - - return flow.getSnapshotMetadata(); - } - - @Override - public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException { - Objects.requireNonNull(flow); - Objects.requireNonNull(flow.getBucketIdentifier()); - Objects.requireNonNull(flow.getName()); - - // Verify that bucket exists - final File bucketDir = new File(directory, flow.getBucketIdentifier()); - if (!bucketDir.exists()) { - throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier()); - } - - // Verify that there is no flow with the same name in that bucket - final Set flowNames = flowNamesByBucket.get(flow.getBucketIdentifier()); - if (flowNames != null && flowNames.contains(flow.getName())) { - throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier()); - } - - final String flowIdentifier = UUID.randomUUID().toString(); - final File flowDir = new File(bucketDir, flowIdentifier); - if (!flowDir.mkdirs()) { - throw new IOException("Failed to create directory " + flowDir + " for new Flow"); - } - - final File propertiesFile = new File(flowDir, "flow.properties"); - - final Properties flowProperties = new Properties(); - flowProperties.setProperty("name", flow.getName()); - flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp())); - flowProperties.setProperty("description", flow.getDescription()); - flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp())); - - try (final OutputStream out = new FileOutputStream(propertiesFile)) { - flowProperties.store(out, null); - } - - final VersionedFlow response = new VersionedFlow(); - response.setBucketIdentifier(flow.getBucketIdentifier()); - response.setCreatedTimestamp(flow.getCreatedTimestamp()); - response.setDescription(flow.getDescription()); - response.setIdentifier(flowIdentifier); - response.setModifiedTimestamp(flow.getModifiedTimestamp()); - response.setName(flow.getName()); - - return response; - } - - @Override - public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) - throws IOException, NiFiRegistryException { - Objects.requireNonNull(flow); - Objects.requireNonNull(flow.getBucketIdentifier()); - Objects.requireNonNull(flow.getName()); - Objects.requireNonNull(snapshot); - - // Verify that the bucket exists - final File bucketDir = new File(directory, flow.getBucketIdentifier()); - if (!bucketDir.exists()) { - throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier()); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flow.getIdentifier()); - if (!flowDir.exists()) { - throw new NiFiRegistryException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier()); - } - - final File[] versionDirs = flowDir.listFiles(); - if (versionDirs == null) { - throw new IOException("Unable to perform listing of directory " + flowDir); - } - - int maxVersion = 0; - for (final File versionDir : versionDirs) { - final String versionName = versionDir.getName(); - - final int version; - try { - version = Integer.parseInt(versionName); - } catch (final NumberFormatException nfe) { - continue; - } - - if (version > maxVersion) { - maxVersion = version; - } - } - - final int snapshotVersion = maxVersion + 1; - final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion)); - if (!snapshotDir.mkdir()) { - throw new IOException("Could not create directory " + snapshotDir); - } - - final File contentsFile = new File(snapshotDir, "flow.xml"); - - try (final OutputStream out = new FileOutputStream(contentsFile); - final JsonGenerator generator = jsonFactory.createGenerator(out)) { - generator.setCodec(new ObjectMapper()); - generator.setPrettyPrinter(new DefaultPrettyPrinter()); - generator.writeObject(snapshot); - } - - final Properties snapshotProperties = new Properties(); - snapshotProperties.setProperty("comments", comments); - snapshotProperties.setProperty("name", flow.getName()); - final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties"); - try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) { - snapshotProperties.store(out, null); - } - - final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); - snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier()); - snapshotMetadata.setComments(comments); - snapshotMetadata.setFlowIdentifier(flow.getIdentifier()); - snapshotMetadata.setFlowName(flow.getName()); - snapshotMetadata.setTimestamp(System.currentTimeMillis()); - snapshotMetadata.setVersion(snapshotVersion); - - final VersionedFlowSnapshot response = new VersionedFlowSnapshot(); - response.setSnapshotMetadata(snapshotMetadata); - response.setFlowContents(snapshot); - return response; - } - - @Override - public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { - // Verify that the bucket exists - final File bucketDir = new File(directory, bucketId); - if (!bucketDir.exists()) { - throw new NiFiRegistryException("No bucket exists with ID " + bucketId); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flowId); - if (!flowDir.exists()) { - throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId); - } - - final File[] versionDirs = flowDir.listFiles(); - if (versionDirs == null) { - throw new IOException("Unable to perform listing of directory " + flowDir); - } - - int maxVersion = 0; - for (final File versionDir : versionDirs) { - final String versionName = versionDir.getName(); - - final int version; - try { - version = Integer.parseInt(versionName); - } catch (final NumberFormatException nfe) { - continue; - } - - if (version > maxVersion) { - maxVersion = version; - } - } - - return maxVersion; - } - - @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, NiFiRegistryException { - // Verify that the bucket exists - final File bucketDir = new File(directory, bucketId); - if (!bucketDir.exists()) { - throw new NiFiRegistryException("No bucket exists with ID " + bucketId); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flowId); - if (!flowDir.exists()) { - throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); - } - - final File versionDir = new File(flowDir, String.valueOf(version)); - if (!versionDir.exists()) { - throw new NiFiRegistryException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version); - } - - final File contentsFile = new File(versionDir, "flow.xml"); - - final VersionedProcessGroup processGroup; - try (final JsonParser parser = jsonFactory.createParser(contentsFile)) { - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - parser.setCodec(mapper); - processGroup = parser.readValueAs(VersionedProcessGroup.class); - } - - final Properties properties = new Properties(); - final File snapshotPropsFile = new File(versionDir, "snapshot.properties"); - try (final InputStream in = new FileInputStream(snapshotPropsFile)) { - properties.load(in); - } - - final String comments = properties.getProperty("comments"); - final String flowName = properties.getProperty("name"); - - final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); - snapshotMetadata.setBucketIdentifier(bucketId); - snapshotMetadata.setComments(comments); - snapshotMetadata.setFlowIdentifier(flowId); - snapshotMetadata.setFlowName(flowName); - snapshotMetadata.setTimestamp(System.currentTimeMillis()); - snapshotMetadata.setVersion(version); - - final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); - snapshot.setFlowContents(processGroup); - snapshot.setSnapshotMetadata(snapshotMetadata); - - return snapshot; - } - - @Override - public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { - // Verify that the bucket exists - final File bucketDir = new File(directory, bucketId); - if (!bucketDir.exists()) { - throw new NiFiRegistryException("No bucket exists with ID " + bucketId); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flowId); - if (!flowDir.exists()) { - throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); - } - - final File flowPropsFile = new File(flowDir, "flow.properties"); - final Properties flowProperties = new Properties(); - try (final InputStream in = new FileInputStream(flowPropsFile)) { - flowProperties.load(in); - } - - final VersionedFlow flow = new VersionedFlow(); - flow.setBucketIdentifier(bucketId); - flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created"))); - flow.setDescription(flowProperties.getProperty("description")); - flow.setIdentifier(flowId); - flow.setModifiedTimestamp(flowDir.lastModified()); - flow.setName(flowProperties.getProperty("name")); - - final Comparator versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion()); - - final SortedSet snapshotMetadataSet = new TreeSet<>(versionComparator); - flow.setSnapshotMetadata(snapshotMetadataSet); - - final File[] versionDirs = flowDir.listFiles(); - flow.setVersionCount(versionDirs.length); - - for (final File file : versionDirs) { - if (!file.isDirectory()) { - continue; - } - - int version; - try { - version = Integer.parseInt(file.getName()); - } catch (final NumberFormatException nfe) { - // not a version. skip. - continue; - } - - final File snapshotPropsFile = new File(file, "snapshot.properties"); - final Properties snapshotProperties = new Properties(); - try (final InputStream in = new FileInputStream(snapshotPropsFile)) { - snapshotProperties.load(in); - } - - final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); - metadata.setBucketIdentifier(bucketId); - metadata.setComments(snapshotProperties.getProperty("comments")); - metadata.setFlowIdentifier(flowId); - metadata.setFlowName(snapshotProperties.getProperty("name")); - metadata.setTimestamp(file.lastModified()); - metadata.setVersion(version); - - snapshotMetadataSet.add(metadata); - } - - return flow; - } - - @Override - public String getIdentifier() { - return id; - } - - @Override - public String getDescription() { - return description; - } - - @Override - public void setDescription(String description) { - this.description = description; - } - - @Override - public void setURL(String url) { - this.url = url; - } - - @Override - public void setName(String name) { - this.name = name; - } -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 26be69b5f6..8bf89c620d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -111,55 +111,59 @@ public class RestBasedFlowRegistry implements FlowRegistry { this.name = name; } + private String getIdentity(final NiFiUser user) { + return (user == null || user.isAnonymous()) ? null : user.getIdentity(); + } + @Override public Set getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity()); + final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user)); return new HashSet<>(bucketClient.getAll()); } - @Override - public Bucket getBucket(final String bucketId) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(); - return bucketClient.get(bucketId); - } - @Override public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity()); + final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user)); return bucketClient.get(bucketId); } @Override public Set getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(user.isAnonymous() ? null : user.getIdentity()); + final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); return new HashSet<>(flowClient.getByBucket(bucketId)); } @Override public Set getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(user.isAnonymous() ? null : user.getIdentity()); + final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId)); } @Override - public VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(); + public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException { + final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); return flowClient.create(flow); } @Override - public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) - throws IOException, NiFiRegistryException { + public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { + final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + return flowClient.delete(bucketId, flowId); + } - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(); + @Override + public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, + final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException { + + final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); versionedFlowSnapshot.setFlowContents(snapshot); final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); metadata.setBucketIdentifier(flow.getBucketIdentifier()); metadata.setFlowIdentifier(flow.getIdentifier()); - metadata.setFlowName(flow.getName()); + metadata.setAuthor(getIdentity(user)); metadata.setTimestamp(System.currentTimeMillis()); metadata.setVersion(expectedVersion); metadata.setComments(comments); @@ -169,24 +173,29 @@ public class RestBasedFlowRegistry implements FlowRegistry { } @Override - public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { - return (int) getRegistryClient().getFlowClient().get(bucketId, flowId).getVersionCount(); + public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { + return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount(); } @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(); + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException { + final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version); final VersionedProcessGroup contents = flowSnapshot.getFlowContents(); for (final VersionedProcessGroup child : contents.getProcessGroups()) { - populateVersionedContentsRecursively(child); + populateVersionedContentsRecursively(child, user); } return flowSnapshot; } - private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException { + @Override + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException { + return getFlowContents(bucketId, flowId, version, null); + } + + private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException { if (group == null) { return; } @@ -205,7 +214,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { } final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); - final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version); + final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user); final VersionedProcessGroup contents = snapshot.getFlowContents(); group.setComments(contents.getComments()); @@ -222,14 +231,19 @@ public class RestBasedFlowRegistry implements FlowRegistry { } for (final VersionedProcessGroup child : group.getProcessGroups()) { - populateVersionedContentsRecursively(child); + populateVersionedContentsRecursively(child, user); } } + @Override + public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { + final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + return flowClient.get(bucketId, flowId); + } + @Override public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { final FlowClient flowClient = getRegistryClient().getFlowClient(); return flowClient.get(bucketId, flowId); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java index d5d0d86b69..8a2447ddea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java @@ -17,7 +17,6 @@ package org.apache.nifi.registry.flow; -import java.io.IOException; import java.net.URI; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -56,14 +55,15 @@ public class StandardFlowRegistryClient implements FlowRegistryClient { final String uriScheme = uri.getScheme(); final FlowRegistry registry; - if (uriScheme.equalsIgnoreCase("file")) { - try { - registry = new FileBasedFlowRegistry(registryId, registryUrl); - } catch (IOException e) { - throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl, e); + if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) { + final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false); + if (sslContext == null && uriScheme.equalsIgnoreCase("https")) { + throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl + + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. " + + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https."); } - registry.setName(registryName); + registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName); registry.setDescription(description); } else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) { final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java index aaba126d8d..92a4166d71 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -18,7 +18,6 @@ package org.apache.nifi.registry.flow; import java.util.Objects; -import java.util.Optional; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; @@ -33,8 +32,8 @@ public class StandardVersionControlInformation implements VersionControlInformat private volatile String flowDescription; private final int version; private volatile VersionedProcessGroup flowSnapshot; - private volatile Boolean modified = null; - private volatile Boolean current = null; + private volatile boolean modified; + private volatile boolean current; public static class Builder { private String registryIdentifier; @@ -89,12 +88,12 @@ public class StandardVersionControlInformation implements VersionControlInformat return this; } - public Builder modified(Boolean modified) { + public Builder modified(boolean modified) { this.modified = modified; return this; } - public Builder current(Boolean current) { + public Builder current(boolean current) { this.current = current; return this; } @@ -113,8 +112,8 @@ public class StandardVersionControlInformation implements VersionControlInformat .flowId(dto.getFlowId()) .flowName(dto.getFlowName()) .flowDescription(dto.getFlowDescription()) - .current(dto.getCurrent()) - .modified(dto.getModified()) + .current(dto.getCurrent() == null ? true : dto.getCurrent()) + .modified(dto.getModified() == null ? false : dto.getModified()) .version(dto.getVersion()); return builder; @@ -139,7 +138,7 @@ public class StandardVersionControlInformation implements VersionControlInformat public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, - final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) { + final VersionedProcessGroup snapshot, final boolean modified, final boolean current) { this.registryIdentifier = registryId; this.registryName = registryName; this.bucketIdentifier = bucketId; @@ -208,13 +207,13 @@ public class StandardVersionControlInformation implements VersionControlInformat } @Override - public Optional getModified() { - return Optional.ofNullable(modified); + public boolean isModified() { + return modified; } @Override - public Optional getCurrent() { - return Optional.ofNullable(current); + public boolean isCurrent() { + return current; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index be907ba137..76cd2c4109 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1317,6 +1317,16 @@ public interface NiFiServiceFacade { */ VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, StartVersionControlRequestEntity requestEntity); + /** + * Deletes the specified Versioned Flow from the specified Flow Registry + * + * @param registryId the ID of the Flow Registry + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @return the VersionedFlow that was deleted + */ + VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId) throws IOException, NiFiRegistryException; + /** * Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 89e00baf85..4d1bbbcdf9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.commons.collections4.CollectionUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -88,9 +90,9 @@ import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; @@ -228,6 +230,7 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.SnippetEntity; +import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.TenantEntity; @@ -237,7 +240,6 @@ import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; -import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; import org.apache.nifi.web.controller.ControllerFacade; @@ -268,8 +270,8 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; +import com.google.common.collect.Sets; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -3667,8 +3669,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { action = "add the local flow to the Flow Registry as the first Snapshot"; // add first snapshot to the flow in the registry - final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription(); - registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments, expectedVersion); + registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion); } catch (final NiFiRegistryException e) { throw new IllegalArgumentException(e); } catch (final IOException ioe) { @@ -3676,14 +3677,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action); } + final Bucket bucket = registeredSnapshot.getBucket(); + final VersionedFlow flow = registeredSnapshot.getFlow(); + // Update the Process Group with the new VersionControlInformation. (Send this to all nodes). final VersionControlInformationDTO vci = new VersionControlInformationDTO(); - vci.setBucketId(registeredFlow.getBucketIdentifier()); - vci.setBucketName(registeredFlow.getBucketName()); + vci.setBucketId(bucket.getIdentifier()); + vci.setBucketName(bucket.getName()); vci.setCurrent(true); - vci.setFlowId(registeredFlow.getIdentifier()); - vci.setFlowName(registeredFlow.getName()); - vci.setFlowDescription(registeredFlow.getDescription()); + vci.setFlowId(flow.getIdentifier()); + vci.setFlowName(flow.getName()); + vci.setFlowDescription(flow.getDescription()); vci.setGroupId(groupId); vci.setModified(false); vci.setRegistryId(registryId); @@ -3702,6 +3706,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entity; } + @Override + public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException { + final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); + if (registry == null) { + throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId); + } + + return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser()); + } + @Override public VersionControlInformationEntity getVersionControlInformation(final String groupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); @@ -3737,8 +3751,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), - versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion()); - + versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser()); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true); @@ -3784,7 +3797,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); } - return registry.registerVersionedFlow(flow); + return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser()); } private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException { @@ -3793,7 +3806,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); } - return registry.getVersionedFlow(bucketId, flowId); + return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser()); } @Override @@ -3804,7 +3817,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); } - return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion); + return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser()); } @Override @@ -4023,7 +4036,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot snapshot; try { - snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion()); + snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser()); } catch (final NiFiRegistryException e) { throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); @@ -4064,7 +4077,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot childSnapshot; try { - childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion()); + childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser()); } catch (final NiFiRegistryException e) { throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket " + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index d24dcbb9eb..fe57dd6d0d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -87,10 +87,11 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowEncodingVersion; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistryUtils; +import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; @@ -1643,11 +1644,12 @@ public class ProcessGroupResource extends ApplicationResource { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo); + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); - final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); - versionControlInfo.setBucketName(metadata.getBucketName()); - versionControlInfo.setFlowName(metadata.getFlowName()); - versionControlInfo.setFlowDescription(metadata.getFlowDescription()); + versionControlInfo.setBucketName(bucket.getName()); + versionControlInfo.setFlowName(flow.getName()); + versionControlInfo.setFlowDescription(flow.getDescription()); versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId())); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index f61b399586..3684f04b2f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,12 +17,37 @@ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.ProcessGroupAuthorizable; @@ -34,7 +59,9 @@ import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.flow.FlowRegistryUtils; +import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; @@ -69,35 +96,12 @@ import org.apache.nifi.web.util.Pause; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; @Path("/versions") @Api(value = "/versions", description = "Endpoint for managing version control for a flow") @@ -125,9 +129,12 @@ public class VersionsResource extends ApplicationResource { @Consumes(MediaType.WILDCARD) @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") - @ApiOperation(value = "Gets the Version Control information for a process group", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { - @Authorization(value = "Read - /process-groups/{uuid}") - }) + @ApiOperation(value = "Gets the Version Control information for a process group", + response = VersionControlInformationEntity.class, + notes = NON_GUARANTEED_ENDPOINT, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}") + }) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @@ -164,7 +171,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("start-requests") @ApiOperation( - value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed", + value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will " + + "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A " + + "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT) @ApiResponses(value = { @@ -305,7 +314,8 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("start-requests/{id}") @ApiOperation( - value = "Deletes the request with the given ID", + value = "Deletes the Version Control Request with the given ID. This will allow other threads to save flows to the Flow Registry. See also the documentation " + + "for POSTing to /versions/start-requests for information regarding why this is done.", notes = NON_GUARANTEED_ENDPOINT) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @@ -349,7 +359,8 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "Begins version controlling the Process Group with the given ID", + value = "Begins version controlling the Process Group with the given ID or commits changes to the Versioned Flow, " + + "depending on if the provided VersionControlInformation includes a flowId", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -365,7 +376,7 @@ public class VersionsResource extends ApplicationResource { @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") }) - public Response startVersionControl( + public Response saveToFlowRegistry( @ApiParam("The process group id.") @PathParam("id") final String groupId, @ApiParam(value = "The versioned flow details.", required = true) final StartVersionControlRequestEntity requestEntity) throws IOException { @@ -402,29 +413,7 @@ public class VersionsResource extends ApplicationResource { final URI requestUri; try { final URI originalUri = getAbsolutePath(); - final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), - originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment()); - - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); - } - - if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { - final String errorResponse = getResponseEntity(clusterResponse, String.class); - throw new IllegalStateException( - "Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse); - } - - final String requestId = getResponseEntity(clusterResponse, String.class); + final String requestId = lockVersionControl(originalUri, groupId); requestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), "/nifi-api/versions/start-requests/" + requestId, null, originalUri.getFragment()); @@ -439,54 +428,12 @@ public class VersionsResource extends ApplicationResource { // Finally, we can delete the Request. try { final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity); - - final Map headers = new HashMap<>(); - headers.put("content-type", MediaType.APPLICATION_JSON); - - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); - } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); - } - - if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { - final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + "."; - final Throwable cause = clusterResponse.getThrowable(); - if (cause == null) { - throw new IllegalStateException(message); - } else { - throw new IllegalStateException(message, cause); - } - } + replicateVersionControlMapping(mappingEntity, requestEntity, requestUri, groupId); final VersionControlInformationEntity responseEntity = serviceFacade.getVersionControlInformation(groupId); return generateOkResponse(responseEntity).build(); } finally { - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. " - + "Users may be unable to Version Control other Process Groups until the request lock times out.", ie); - } - - if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { - logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. " - + "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus()); - } + unlockVersionControl(requestUri, groupId); } } @@ -560,13 +507,115 @@ public class VersionsResource extends ApplicationResource { }); } + private void unlockVersionControl(final URI requestUri, final String groupId) { + final NodeResponse clusterResponse; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. " + + "Users may be unable to Version Control other Process Groups until the request lock times out.", ie); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. " + + "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus()); + } + } + + private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException { + final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment()); + + final NodeResponse clusterResponse; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + final String errorResponse = getResponseEntity(clusterResponse, String.class); + throw new IllegalStateException( + "Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse); + } + + final String requestId = getResponseEntity(clusterResponse, String.class); + return requestId; + } + + private void replicateVersionControlMapping(final VersionControlComponentMappingEntity mappingEntity, final StartVersionControlRequestEntity requestEntity, final URI requestUri, + final String groupId) { + final Map headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); + + final NodeResponse clusterResponse; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + + if (requestEntity.getVersionedFlow().getFlowId() == null) { + // We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the + // flow from the Flow Registry (use best effort; if we can't remove it, just log and move on). + final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation(); + try { + serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId()); + } catch (final Exception e) { + logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. " + + "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e); + } + } + + throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + if (requestEntity.getVersionedFlow().getFlowId() == null) { + // We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the + // flow from the Flow Registry (use best effort; if we can't remove it, just log and move on). + final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation(); + try { + serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId()); + } catch (final Exception e) { + logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. " + + "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e); + } + } + + final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + "."; + final Throwable cause = clusterResponse.getThrowable(); + if (cause == null) { + throw new IllegalStateException(message); + } else { + throw new IllegalStateException(message, cause); + } + } + } + @DELETE @Consumes(MediaType.WILDCARD) @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "Stops version controlling the Process Group with the given ID", + value = "Stops version controlling the Process Group with the given ID. The Process Group will no longer track to any Versioned Flow.", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -626,7 +675,8 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version", + value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version. This endpoint expects " + + "that the given snapshot will not modify any Processor that is currently running or any Controller Service that is enabled.", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -689,14 +739,17 @@ public class VersionsResource extends ApplicationResource { serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false); }, (rev, entity) -> { + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); + // Update the Process Group to match the proposed flow snapshot final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO(); versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier()); - versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName()); + versionControlInfoDto.setBucketName(bucket.getName()); versionControlInfoDto.setCurrent(true); versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier()); - versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName()); - versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription()); + versionControlInfoDto.setFlowName(flow.getName()); + versionControlInfoDto.setFlowDescription(flow.getDescription()); versionControlInfoDto.setGroupId(groupId); versionControlInfoDto.setModified(false); versionControlInfoDto.setVersion(snapshotMetadata.getVersion()); @@ -720,7 +773,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("update-requests/{id}") @ApiOperation( - value = "Returns the Update Request with the given ID", + value = "Returns the Update Request with the given ID. Once an Update Request has been created by performing a POST to /versions/update-requests/process-groups/{id}, " + + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " + + "current state of the request, and any failures.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -741,7 +796,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("revert-requests/{id}") @ApiOperation( - value = "Returns the Revert Request with the given ID", + value = "Returns the Revert Request with the given ID. Once a Revert Request has been created by performing a POST to /versions/revert-requests/process-groups/{id}, " + + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " + + "current state of the request, and any failures.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -795,7 +852,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("update-requests/{id}") @ApiOperation( - value = "Deletes the Update Request with the given ID", + value = "Deletes the Update Request with the given ID. After a request is created via a POST to /versions/update-requests/process-groups/{id}, it is expected " + + "that the client will properly clean up the request by DELETE'ing it, once the Update process has completed. If the request is deleted before the request " + + "completes, then the Update request will finish the step that it is currently performing and then will cancel any subsequent steps.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -816,7 +875,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("revert-requests/{id}") @ApiOperation( - value = "Deletes the Revert Request with the given ID", + value = "Deletes the Revert Request with the given ID. After a request is created via a POST to /versions/revert-requests/process-groups/{id}, it is expected " + + "that the client will properly clean up the request by DELETE'ing it, once the Revert process has completed. If the request is deleted before the request " + + "completes, then the Revert request will finish the step that it is currently performing and then will cancel any subsequent steps.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -881,7 +942,12 @@ public class VersionsResource extends ApplicationResource { @Path("update-requests/process-groups/{id}") @ApiOperation( value = "For a Process Group that is already under Version Control, this will initiate the action of changing " - + "from a specific version of the flow in the Flow Registry to a different version of the flow.", + + "from a specific version of the flow in the Flow Registry to a different version of the flow. This can be a lengthy " + + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, " + + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur " + + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to " + + "/versions/update-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + + "/versions/update-requests/{requestId}.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -1058,8 +1124,13 @@ public class VersionsResource extends ApplicationResource { @Path("revert-requests/process-groups/{id}") @ApiOperation( value = "For a Process Group that is already under Version Control, this will initiate the action of reverting " - + "any changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the " - + "flow matching the Versioned Flow that exists in the Flow Registry.", + + "any local changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the " + + "flow matching the Versioned Flow that exists in the Flow Registry. This can be a lengthy " + + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, " + + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur " + + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to " + + "/versions/revert-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + + "/versions/revert-requests/{requestId}.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -1174,34 +1245,6 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - // If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return. - if (currentVersion.getModified() == Boolean.FALSE) { - final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete"); - requestManager.submitRequest("revert-requests", requestId, request, task -> { - }); - - // There is nothing to do. Generate the response and send it back to the user. - final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO(); - updateRequestDto.setComplete(true); - updateRequestDto.setFailureReason(null); - updateRequestDto.setLastUpdated(new Date()); - updateRequestDto.setProcessGroupId(groupId); - updateRequestDto.setRequestId(requestId); - updateRequestDto.setVersionControlInformation(currentVersion); - updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId)); - updateRequestDto.setPercentCompleted(100); - updateRequestDto.setState(request.getState()); - - final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); - updateRequestEntity.setProcessGroupRevision(revisionDto); - updateRequestEntity.setRequest(updateRequestDto); - - request.markComplete(currentVersionEntity); - return generateOkResponse(updateRequestEntity).build(); - } - - // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); @@ -1331,7 +1374,25 @@ public class VersionsResource extends ApplicationResource { // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); - final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation(); + final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); + + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); + + final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); + final VersionControlInformationDTO vci = new VersionControlInformationDTO(); + vci.setBucketId(metadata.getBucketIdentifier()); + vci.setBucketName(bucket.getName()); + vci.setCurrent(flowSnapshot.isLatest()); + vci.setFlowDescription(flow.getDescription()); + vci.setFlowId(flow.getIdentifier()); + vci.setFlowName(flow.getName()); + vci.setGroupId(groupId); + vci.setModified(false); + vci.setRegistryId(requestVci.getRegistryId()); + vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); + vci.setVersion(metadata.getVersion()); + serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index bd603be8e3..1a12dcf8ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,33 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -188,32 +215,6 @@ import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -2230,8 +2231,8 @@ public final class DtoFactory { dto.setFlowName(versionControlInfo.getFlowName()); dto.setFlowDescription(versionControlInfo.getFlowDescription()); dto.setVersion(versionControlInfo.getVersion()); - dto.setCurrent(versionControlInfo.getCurrent().orElse(true)); - dto.setModified(versionControlInfo.getModified().orElse(false)); + dto.setCurrent(versionControlInfo.isCurrent()); + dto.setModified(versionControlInfo.isModified()); return dto; }