mirror of https://github.com/apache/nifi.git
NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated authorizations required for version control endpoints - Add state and percent complete fields ot VersionedFlowUpdateRequestDTO - If a variable exists in a parent process group, do not include it in imported/updated process group when interacting with flow registry - Code cleanup, documentation; bug fixes - Ensure that we are passing NiFiUser to the flow registry client when appropriate - Updated to work against new version of flow registry client; deleted file-based flow registry client
Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
parent
d6e54f19ee
commit
f6cc5b6cdc
|
@ -83,14 +83,6 @@ public interface FlowRegistry {
|
||||||
*/
|
*/
|
||||||
Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException;
|
Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the bucket with the given ID
|
|
||||||
*
|
|
||||||
* @param bucketId the id of the bucket
|
|
||||||
* @return the bucket with the given ID
|
|
||||||
*/
|
|
||||||
Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the set of all Versioned Flows for the specified bucket
|
* Retrieves the set of all Versioned Flows for the specified bucket
|
||||||
*
|
*
|
||||||
|
@ -123,7 +115,15 @@ public interface FlowRegistry {
|
||||||
* @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null
|
* @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null
|
||||||
* @throws NiFiRegistryException if the bucket id does not exist
|
* @throws NiFiRegistryException if the bucket id does not exist
|
||||||
*/
|
*/
|
||||||
VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, NiFiRegistryException;
|
VersionedFlow registerVersionedFlow(VersionedFlow flow, NiFiUser user) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the specified flow from the Flow Registry
|
||||||
|
*
|
||||||
|
* @param bucketId the ID of the bucket
|
||||||
|
* @param flowId the ID of the flow
|
||||||
|
*/
|
||||||
|
VersionedFlow deleteVersionedFlow(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the given snapshot to the Flow Registry for the given flow
|
* Adds the given snapshot to the Flow Registry for the given flow
|
||||||
|
@ -138,7 +138,8 @@ public interface FlowRegistry {
|
||||||
* @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null
|
* @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null
|
||||||
* @throws NiFiRegistryException if the flow does not exist
|
* @throws NiFiRegistryException if the flow does not exist
|
||||||
*/
|
*/
|
||||||
VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion) throws IOException, NiFiRegistryException;
|
VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion, NiFiUser user)
|
||||||
|
throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow
|
* Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow
|
||||||
|
@ -150,7 +151,23 @@ public interface FlowRegistry {
|
||||||
* @throws IOException if unable to communicate with the Flow Registry
|
* @throws IOException if unable to communicate with the Flow Registry
|
||||||
* @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID
|
* @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID
|
||||||
*/
|
*/
|
||||||
int getLatestVersion(String bucketId, String flowId) throws IOException, NiFiRegistryException;
|
int getLatestVersion(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
|
||||||
|
*
|
||||||
|
* @param bucketId the ID of the bucket
|
||||||
|
* @param flowId the ID of the flow
|
||||||
|
* @param version the version to retrieve
|
||||||
|
* @return the contents of the Flow from the Flow Registry
|
||||||
|
*
|
||||||
|
* @throws IOException if unable to communicate with the Flow Registry
|
||||||
|
* @throws NiFiRegistryException if unable to find the contents of the flow due to the bucket or flow not existing,
|
||||||
|
* or the specified version of the flow not existing
|
||||||
|
* @throws NullPointerException if any of the arguments is not specified
|
||||||
|
* @throws IllegalArgumentException if the given version is less than 1
|
||||||
|
*/
|
||||||
|
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
|
* Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
|
||||||
|
@ -166,7 +183,6 @@ public interface FlowRegistry {
|
||||||
* @throws NullPointerException if any of the arguments is not specified
|
* @throws NullPointerException if any of the arguments is not specified
|
||||||
* @throws IllegalArgumentException if the given version is less than 1
|
* @throws IllegalArgumentException if the given version is less than 1
|
||||||
*/
|
*/
|
||||||
// TODO: Need to pass in user
|
|
||||||
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException;
|
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -179,6 +195,18 @@ public interface FlowRegistry {
|
||||||
* @throws IOException if unable to communicate with the Flow Registry
|
* @throws IOException if unable to communicate with the Flow Registry
|
||||||
* @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
|
* @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
|
||||||
*/
|
*/
|
||||||
// TODO: Need to pass in user
|
VersionedFlow getVersionedFlow(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves a VersionedFlow by bucket id and flow id
|
||||||
|
*
|
||||||
|
* @param bucketId the ID of the bucket
|
||||||
|
* @param flowId the ID of the flow
|
||||||
|
* @return the VersionedFlow for the given bucket and flow ID's
|
||||||
|
*
|
||||||
|
* @throws IOException if unable to communicate with the Flow Registry
|
||||||
|
* @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
|
||||||
|
*/
|
||||||
|
// TODO: Do we still need this?
|
||||||
VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException;
|
VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.nifi.registry.flow;
|
package org.apache.nifi.registry.flow;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Provides a mechanism for conveying which Flow Registry a flow is stored in, and
|
* Provides a mechanism for conveying which Flow Registry a flow is stored in, and
|
||||||
|
@ -69,18 +67,14 @@ public interface VersionControlInformation {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
|
* @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
|
||||||
* to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry. An empty optional will be returned
|
* to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry.
|
||||||
* if it is not yet known whether or not the flow has been modified (for example, on startup, when the flow has not yet been
|
|
||||||
* fetched from the Flow Registry)
|
|
||||||
*/
|
*/
|
||||||
Optional<Boolean> getModified();
|
boolean isModified();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry.
|
* @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry, <code>false</code> otherwise.
|
||||||
* An empty optional will be returned if it is not yet known whether or not the flow has been modified (for example, on startup,
|
|
||||||
* when the flow has not yet been fetched from the Flow Registry)
|
|
||||||
*/
|
*/
|
||||||
Optional<Boolean> getCurrent();
|
boolean isCurrent();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the snapshot of the flow that was synchronized with the Flow Registry
|
* @return the snapshot of the flow that was synchronized with the Flow Registry
|
||||||
|
|
|
@ -2822,17 +2822,17 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
versionControlInformation.getFlowIdentifier(),
|
versionControlInformation.getFlowIdentifier(),
|
||||||
versionControlInformation.getVersion(),
|
versionControlInformation.getVersion(),
|
||||||
versionControlInformation.getFlowSnapshot(),
|
versionControlInformation.getFlowSnapshot(),
|
||||||
versionControlInformation.getModified().orElse(null),
|
versionControlInformation.isModified(),
|
||||||
versionControlInformation.getCurrent().orElse(null)) {
|
versionControlInformation.isCurrent()) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Boolean> getModified() {
|
public boolean isModified() {
|
||||||
final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications();
|
final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications();
|
||||||
if (differences == null) {
|
if (differences == null) {
|
||||||
return Optional.ofNullable(null);
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Optional.of(!differences.isEmpty());
|
return !differences.isEmpty();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -2938,7 +2938,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final VersionedFlow versionedFlow = flowRegistry.getVersionedFlow(vci.getBucketIdentifier(), vci.getFlowIdentifier());
|
final VersionedFlow versionedFlow = flowRegistry.getVersionedFlow(vci.getBucketIdentifier(), vci.getFlowIdentifier());
|
||||||
final int latestVersion = (int) versionedFlow.getVersionCount();
|
final int latestVersion = (int) versionedFlow.getVersionCount();
|
||||||
|
|
||||||
vci.setBucketName(versionedFlow.getBucketName());
|
vci.setBucketName(versionedFlow.getBucketName());
|
||||||
vci.setFlowName(versionedFlow.getName());
|
vci.setFlowName(versionedFlow.getName());
|
||||||
vci.setFlowDescription(versionedFlow.getDescription());
|
vci.setFlowDescription(versionedFlow.getDescription());
|
||||||
|
@ -2986,7 +2985,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
|
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings);
|
final Set<String> knownVariables = getKnownVariableNames();
|
||||||
|
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables);
|
||||||
} catch (final ProcessorInstantiationException pie) {
|
} catch (final ProcessorInstantiationException pie) {
|
||||||
throw new RuntimeException(pie);
|
throw new RuntimeException(pie);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -2994,9 +2994,26 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Set<String> getKnownVariableNames() {
|
||||||
|
final Set<String> variableNames = new HashSet<>();
|
||||||
|
populateKnownVariableNames(this, variableNames);
|
||||||
|
return variableNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void populateKnownVariableNames(final ProcessGroup group, final Set<String> knownVariables) {
|
||||||
|
group.getVariableRegistry().getVariableMap().keySet().stream()
|
||||||
|
.map(VariableDescriptor::getName)
|
||||||
|
.forEach(knownVariables::add);
|
||||||
|
|
||||||
|
final ProcessGroup parent = group.getParent();
|
||||||
|
if (parent != null) {
|
||||||
|
populateKnownVariableNames(parent, knownVariables);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
|
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
|
||||||
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException {
|
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set<String> variablesToSkip) throws ProcessorInstantiationException {
|
||||||
|
|
||||||
group.setComments(proposed.getComments());
|
group.setComments(proposed.getComments());
|
||||||
|
|
||||||
|
@ -3027,7 +3044,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
// If any new variables exist in the proposed flow, add those to the variable registry.
|
// If any new variables exist in the proposed flow, add those to the variable registry.
|
||||||
for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
|
for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
|
||||||
if (!existingVariableNames.contains(entry.getKey())) {
|
if (!existingVariableNames.contains(entry.getKey()) && !variablesToSkip.contains(entry.getKey())) {
|
||||||
updatedVariableMap.put(entry.getKey(), entry.getValue());
|
updatedVariableMap.put(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3068,10 +3085,10 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
|
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
|
||||||
|
|
||||||
if (childGroup == null) {
|
if (childGroup == null) {
|
||||||
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed);
|
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
|
||||||
LOG.info("Added {} to {}", added, this);
|
LOG.info("Added {} to {}", added, this);
|
||||||
} else {
|
} else {
|
||||||
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName);
|
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip);
|
||||||
LOG.info("Updated {}", childGroup);
|
LOG.info("Updated {}", childGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3345,11 +3362,12 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException {
|
private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip)
|
||||||
|
throws ProcessorInstantiationException {
|
||||||
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
|
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
|
||||||
group.setVersionedComponentId(proposed.getIdentifier());
|
group.setVersionedComponentId(proposed.getIdentifier());
|
||||||
group.setParent(destination);
|
group.setParent(destination);
|
||||||
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true);
|
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip);
|
||||||
destination.addProcessGroup(group);
|
destination.addProcessGroup(group);
|
||||||
return group;
|
return group;
|
||||||
}
|
}
|
||||||
|
@ -3771,16 +3789,11 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (verifyNotDirty) {
|
if (verifyNotDirty) {
|
||||||
final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
|
final boolean modified = versionControlInfo.isModified();
|
||||||
if (!modifiedOption.isPresent()) {
|
|
||||||
throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
|
|
||||||
+ "has not yet been synchronized with the Flow Registry. The Process Group must be"
|
|
||||||
+ " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<FlowDifference> modifications = getModifications();
|
final Set<FlowDifference> modifications = getModifications();
|
||||||
|
|
||||||
if (Boolean.TRUE.equals(modifiedOption.get())) {
|
if (modified) {
|
||||||
final String changes = modifications.stream()
|
final String changes = modifications.stream()
|
||||||
.map(FlowDifference::toString)
|
.map(FlowDifference::toString)
|
||||||
.collect(Collectors.joining("\n"));
|
.collect(Collectors.joining("\n"));
|
||||||
|
|
|
@ -1,509 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.nifi.registry.flow;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
|
||||||
import org.apache.nifi.registry.bucket.Bucket;
|
|
||||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
|
||||||
import com.fasterxml.jackson.core.JsonGenerator;
|
|
||||||
import com.fasterxml.jackson.core.JsonParser;
|
|
||||||
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A simple file-based implementation of a Flow Registry Client. Rather than interacting
|
|
||||||
* with an actual Flow Registry, this implementation simply reads flows from disk and writes
|
|
||||||
* them to disk. It is not meant for any production use but is available for testing purposes.
|
|
||||||
*/
|
|
||||||
public class FileBasedFlowRegistry implements FlowRegistry {
|
|
||||||
private final File directory;
|
|
||||||
private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
|
|
||||||
private final JsonFactory jsonFactory = new JsonFactory();
|
|
||||||
private final String id;
|
|
||||||
private volatile String name = "Local Registry";
|
|
||||||
private volatile String url = "file:" + (new File("..").getAbsolutePath());
|
|
||||||
private volatile String description = "Default file-based Flow Registry";
|
|
||||||
|
|
||||||
public FileBasedFlowRegistry(final String id, final String url) throws IOException {
|
|
||||||
final URI uri = URI.create(url);
|
|
||||||
if (!uri.getScheme().equalsIgnoreCase("file")) {
|
|
||||||
throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'");
|
|
||||||
}
|
|
||||||
|
|
||||||
this.directory = new File(URI.create(url).getPath());
|
|
||||||
|
|
||||||
if (!directory.exists() && !directory.mkdirs()) {
|
|
||||||
throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
|
|
||||||
}
|
|
||||||
|
|
||||||
this.id = id;
|
|
||||||
this.url = url;
|
|
||||||
recoverBuckets();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void recoverBuckets() throws IOException {
|
|
||||||
final File[] bucketDirs = directory.listFiles();
|
|
||||||
if (bucketDirs == null) {
|
|
||||||
throw new IOException("Could not get listing of directory " + directory);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final File bucketDir : bucketDirs) {
|
|
||||||
final File[] flowDirs = bucketDir.listFiles();
|
|
||||||
if (flowDirs == null) {
|
|
||||||
throw new IOException("Could not get listing of directory " + bucketDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<String> flowNames = new HashSet<>();
|
|
||||||
for (final File flowDir : flowDirs) {
|
|
||||||
final File propsFile = new File(flowDir, "flow.properties");
|
|
||||||
if (!propsFile.exists()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
|
||||||
try (final InputStream in = new FileInputStream(propsFile)) {
|
|
||||||
properties.load(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String flowName = properties.getProperty("name");
|
|
||||||
if (flowName == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
flowNames.add(flowName);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!flowNames.isEmpty()) {
|
|
||||||
flowNamesByBucket.put(bucketDir.getName(), flowNames);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getURL() {
|
|
||||||
return url;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getName() {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<Bucket> getBuckets(NiFiUser user) throws IOException {
|
|
||||||
final Set<Bucket> buckets = new HashSet<>();
|
|
||||||
|
|
||||||
final File[] bucketDirs = directory.listFiles();
|
|
||||||
if (bucketDirs == null) {
|
|
||||||
throw new IOException("Could not get listing of directory " + directory);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final File bucketDirectory : bucketDirs) {
|
|
||||||
final String bucketIdentifier = bucketDirectory.getName();
|
|
||||||
final long creation = bucketDirectory.lastModified();
|
|
||||||
|
|
||||||
final Bucket bucket = new Bucket();
|
|
||||||
bucket.setIdentifier(bucketIdentifier);
|
|
||||||
bucket.setName("Bucket '" + bucketIdentifier + "'");
|
|
||||||
bucket.setCreatedTimestamp(creation);
|
|
||||||
|
|
||||||
final Set<VersionedFlow> versionedFlows = new HashSet<>();
|
|
||||||
final File[] flowDirs = bucketDirectory.listFiles();
|
|
||||||
if (flowDirs != null) {
|
|
||||||
for (final File flowDir : flowDirs) {
|
|
||||||
final String flowIdentifier = flowDir.getName();
|
|
||||||
try {
|
|
||||||
final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
|
|
||||||
versionedFlows.add(versionedFlow);
|
|
||||||
} catch (NiFiRegistryException e) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bucket.setVersionedFlows(versionedFlows);
|
|
||||||
|
|
||||||
buckets.add(bucket);
|
|
||||||
}
|
|
||||||
|
|
||||||
return buckets;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException {
|
|
||||||
return getBucket(bucketId, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException {
|
|
||||||
return getBuckets(user).stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
|
||||||
final Bucket bucket = getBuckets(user).stream().filter(b -> bucketId.equals(b.getIdentifier())).findFirst().orElse(null);
|
|
||||||
if (bucket == null) {
|
|
||||||
return Collections.emptySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
return bucket.getVersionedFlows();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
|
||||||
final VersionedFlow flow = getFlows(bucketId, user).stream().filter(f -> flowId.equals(f.getIdentifier())).findFirst().orElse(null);
|
|
||||||
if (flow == null) {
|
|
||||||
return Collections.emptySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
return flow.getSnapshotMetadata();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
|
|
||||||
Objects.requireNonNull(flow);
|
|
||||||
Objects.requireNonNull(flow.getBucketIdentifier());
|
|
||||||
Objects.requireNonNull(flow.getName());
|
|
||||||
|
|
||||||
// Verify that bucket exists
|
|
||||||
final File bucketDir = new File(directory, flow.getBucketIdentifier());
|
|
||||||
if (!bucketDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that there is no flow with the same name in that bucket
|
|
||||||
final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
|
|
||||||
if (flowNames != null && flowNames.contains(flow.getName())) {
|
|
||||||
throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
|
|
||||||
}
|
|
||||||
|
|
||||||
final String flowIdentifier = UUID.randomUUID().toString();
|
|
||||||
final File flowDir = new File(bucketDir, flowIdentifier);
|
|
||||||
if (!flowDir.mkdirs()) {
|
|
||||||
throw new IOException("Failed to create directory " + flowDir + " for new Flow");
|
|
||||||
}
|
|
||||||
|
|
||||||
final File propertiesFile = new File(flowDir, "flow.properties");
|
|
||||||
|
|
||||||
final Properties flowProperties = new Properties();
|
|
||||||
flowProperties.setProperty("name", flow.getName());
|
|
||||||
flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
|
|
||||||
flowProperties.setProperty("description", flow.getDescription());
|
|
||||||
flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
|
|
||||||
|
|
||||||
try (final OutputStream out = new FileOutputStream(propertiesFile)) {
|
|
||||||
flowProperties.store(out, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
final VersionedFlow response = new VersionedFlow();
|
|
||||||
response.setBucketIdentifier(flow.getBucketIdentifier());
|
|
||||||
response.setCreatedTimestamp(flow.getCreatedTimestamp());
|
|
||||||
response.setDescription(flow.getDescription());
|
|
||||||
response.setIdentifier(flowIdentifier);
|
|
||||||
response.setModifiedTimestamp(flow.getModifiedTimestamp());
|
|
||||||
response.setName(flow.getName());
|
|
||||||
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
|
|
||||||
throws IOException, NiFiRegistryException {
|
|
||||||
Objects.requireNonNull(flow);
|
|
||||||
Objects.requireNonNull(flow.getBucketIdentifier());
|
|
||||||
Objects.requireNonNull(flow.getName());
|
|
||||||
Objects.requireNonNull(snapshot);
|
|
||||||
|
|
||||||
// Verify that the bucket exists
|
|
||||||
final File bucketDir = new File(directory, flow.getBucketIdentifier());
|
|
||||||
if (!bucketDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that the flow exists
|
|
||||||
final File flowDir = new File(bucketDir, flow.getIdentifier());
|
|
||||||
if (!flowDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
|
|
||||||
}
|
|
||||||
|
|
||||||
final File[] versionDirs = flowDir.listFiles();
|
|
||||||
if (versionDirs == null) {
|
|
||||||
throw new IOException("Unable to perform listing of directory " + flowDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
int maxVersion = 0;
|
|
||||||
for (final File versionDir : versionDirs) {
|
|
||||||
final String versionName = versionDir.getName();
|
|
||||||
|
|
||||||
final int version;
|
|
||||||
try {
|
|
||||||
version = Integer.parseInt(versionName);
|
|
||||||
} catch (final NumberFormatException nfe) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (version > maxVersion) {
|
|
||||||
maxVersion = version;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final int snapshotVersion = maxVersion + 1;
|
|
||||||
final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
|
|
||||||
if (!snapshotDir.mkdir()) {
|
|
||||||
throw new IOException("Could not create directory " + snapshotDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
final File contentsFile = new File(snapshotDir, "flow.xml");
|
|
||||||
|
|
||||||
try (final OutputStream out = new FileOutputStream(contentsFile);
|
|
||||||
final JsonGenerator generator = jsonFactory.createGenerator(out)) {
|
|
||||||
generator.setCodec(new ObjectMapper());
|
|
||||||
generator.setPrettyPrinter(new DefaultPrettyPrinter());
|
|
||||||
generator.writeObject(snapshot);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Properties snapshotProperties = new Properties();
|
|
||||||
snapshotProperties.setProperty("comments", comments);
|
|
||||||
snapshotProperties.setProperty("name", flow.getName());
|
|
||||||
final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
|
|
||||||
try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
|
|
||||||
snapshotProperties.store(out, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
|
|
||||||
snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
|
|
||||||
snapshotMetadata.setComments(comments);
|
|
||||||
snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
|
|
||||||
snapshotMetadata.setFlowName(flow.getName());
|
|
||||||
snapshotMetadata.setTimestamp(System.currentTimeMillis());
|
|
||||||
snapshotMetadata.setVersion(snapshotVersion);
|
|
||||||
|
|
||||||
final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
|
|
||||||
response.setSnapshotMetadata(snapshotMetadata);
|
|
||||||
response.setFlowContents(snapshot);
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
|
||||||
// Verify that the bucket exists
|
|
||||||
final File bucketDir = new File(directory, bucketId);
|
|
||||||
if (!bucketDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that the flow exists
|
|
||||||
final File flowDir = new File(bucketDir, flowId);
|
|
||||||
if (!flowDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
|
|
||||||
}
|
|
||||||
|
|
||||||
final File[] versionDirs = flowDir.listFiles();
|
|
||||||
if (versionDirs == null) {
|
|
||||||
throw new IOException("Unable to perform listing of directory " + flowDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
int maxVersion = 0;
|
|
||||||
for (final File versionDir : versionDirs) {
|
|
||||||
final String versionName = versionDir.getName();
|
|
||||||
|
|
||||||
final int version;
|
|
||||||
try {
|
|
||||||
version = Integer.parseInt(versionName);
|
|
||||||
} catch (final NumberFormatException nfe) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (version > maxVersion) {
|
|
||||||
maxVersion = version;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return maxVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, NiFiRegistryException {
|
|
||||||
// Verify that the bucket exists
|
|
||||||
final File bucketDir = new File(directory, bucketId);
|
|
||||||
if (!bucketDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that the flow exists
|
|
||||||
final File flowDir = new File(bucketDir, flowId);
|
|
||||||
if (!flowDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
|
|
||||||
}
|
|
||||||
|
|
||||||
final File versionDir = new File(flowDir, String.valueOf(version));
|
|
||||||
if (!versionDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
|
|
||||||
}
|
|
||||||
|
|
||||||
final File contentsFile = new File(versionDir, "flow.xml");
|
|
||||||
|
|
||||||
final VersionedProcessGroup processGroup;
|
|
||||||
try (final JsonParser parser = jsonFactory.createParser(contentsFile)) {
|
|
||||||
final ObjectMapper mapper = new ObjectMapper();
|
|
||||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
|
||||||
|
|
||||||
parser.setCodec(mapper);
|
|
||||||
processGroup = parser.readValueAs(VersionedProcessGroup.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
|
||||||
final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
|
|
||||||
try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
|
|
||||||
properties.load(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String comments = properties.getProperty("comments");
|
|
||||||
final String flowName = properties.getProperty("name");
|
|
||||||
|
|
||||||
final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
|
|
||||||
snapshotMetadata.setBucketIdentifier(bucketId);
|
|
||||||
snapshotMetadata.setComments(comments);
|
|
||||||
snapshotMetadata.setFlowIdentifier(flowId);
|
|
||||||
snapshotMetadata.setFlowName(flowName);
|
|
||||||
snapshotMetadata.setTimestamp(System.currentTimeMillis());
|
|
||||||
snapshotMetadata.setVersion(version);
|
|
||||||
|
|
||||||
final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
|
|
||||||
snapshot.setFlowContents(processGroup);
|
|
||||||
snapshot.setSnapshotMetadata(snapshotMetadata);
|
|
||||||
|
|
||||||
return snapshot;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
|
||||||
// Verify that the bucket exists
|
|
||||||
final File bucketDir = new File(directory, bucketId);
|
|
||||||
if (!bucketDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that the flow exists
|
|
||||||
final File flowDir = new File(bucketDir, flowId);
|
|
||||||
if (!flowDir.exists()) {
|
|
||||||
throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
|
|
||||||
}
|
|
||||||
|
|
||||||
final File flowPropsFile = new File(flowDir, "flow.properties");
|
|
||||||
final Properties flowProperties = new Properties();
|
|
||||||
try (final InputStream in = new FileInputStream(flowPropsFile)) {
|
|
||||||
flowProperties.load(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
final VersionedFlow flow = new VersionedFlow();
|
|
||||||
flow.setBucketIdentifier(bucketId);
|
|
||||||
flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
|
|
||||||
flow.setDescription(flowProperties.getProperty("description"));
|
|
||||||
flow.setIdentifier(flowId);
|
|
||||||
flow.setModifiedTimestamp(flowDir.lastModified());
|
|
||||||
flow.setName(flowProperties.getProperty("name"));
|
|
||||||
|
|
||||||
final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
|
|
||||||
|
|
||||||
final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
|
|
||||||
flow.setSnapshotMetadata(snapshotMetadataSet);
|
|
||||||
|
|
||||||
final File[] versionDirs = flowDir.listFiles();
|
|
||||||
flow.setVersionCount(versionDirs.length);
|
|
||||||
|
|
||||||
for (final File file : versionDirs) {
|
|
||||||
if (!file.isDirectory()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int version;
|
|
||||||
try {
|
|
||||||
version = Integer.parseInt(file.getName());
|
|
||||||
} catch (final NumberFormatException nfe) {
|
|
||||||
// not a version. skip.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
final File snapshotPropsFile = new File(file, "snapshot.properties");
|
|
||||||
final Properties snapshotProperties = new Properties();
|
|
||||||
try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
|
|
||||||
snapshotProperties.load(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
|
|
||||||
metadata.setBucketIdentifier(bucketId);
|
|
||||||
metadata.setComments(snapshotProperties.getProperty("comments"));
|
|
||||||
metadata.setFlowIdentifier(flowId);
|
|
||||||
metadata.setFlowName(snapshotProperties.getProperty("name"));
|
|
||||||
metadata.setTimestamp(file.lastModified());
|
|
||||||
metadata.setVersion(version);
|
|
||||||
|
|
||||||
snapshotMetadataSet.add(metadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
return flow;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getIdentifier() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDescription() {
|
|
||||||
return description;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDescription(String description) {
|
|
||||||
this.description = description;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setURL(String url) {
|
|
||||||
this.url = url;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setName(String name) {
|
|
||||||
this.name = name;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -111,55 +111,59 @@ public class RestBasedFlowRegistry implements FlowRegistry {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getIdentity(final NiFiUser user) {
|
||||||
|
return (user == null || user.isAnonymous()) ? null : user.getIdentity();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
|
public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
|
final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
|
||||||
return new HashSet<>(bucketClient.getAll());
|
return new HashSet<>(bucketClient.getAll());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Bucket getBucket(final String bucketId) throws IOException, NiFiRegistryException {
|
|
||||||
final BucketClient bucketClient = getRegistryClient().getBucketClient();
|
|
||||||
return bucketClient.get(bucketId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
|
final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
|
||||||
return bucketClient.get(bucketId);
|
return bucketClient.get(bucketId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
final FlowClient flowClient = getRegistryClient().getFlowClient(user.isAnonymous() ? null : user.getIdentity());
|
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
|
||||||
return new HashSet<>(flowClient.getByBucket(bucketId));
|
return new HashSet<>(flowClient.getByBucket(bucketId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(user.isAnonymous() ? null : user.getIdentity());
|
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
|
||||||
return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
|
return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
|
public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
final FlowClient flowClient = getRegistryClient().getFlowClient();
|
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
|
||||||
return flowClient.create(flow);
|
return flowClient.create(flow);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
|
public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
throws IOException, NiFiRegistryException {
|
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
|
||||||
|
return flowClient.delete(bucketId, flowId);
|
||||||
|
}
|
||||||
|
|
||||||
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
|
@Override
|
||||||
|
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
|
||||||
|
final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
|
|
||||||
|
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
|
||||||
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
|
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
|
||||||
versionedFlowSnapshot.setFlowContents(snapshot);
|
versionedFlowSnapshot.setFlowContents(snapshot);
|
||||||
|
|
||||||
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
|
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
|
||||||
metadata.setBucketIdentifier(flow.getBucketIdentifier());
|
metadata.setBucketIdentifier(flow.getBucketIdentifier());
|
||||||
metadata.setFlowIdentifier(flow.getIdentifier());
|
metadata.setFlowIdentifier(flow.getIdentifier());
|
||||||
metadata.setFlowName(flow.getName());
|
metadata.setAuthor(getIdentity(user));
|
||||||
metadata.setTimestamp(System.currentTimeMillis());
|
metadata.setTimestamp(System.currentTimeMillis());
|
||||||
metadata.setVersion(expectedVersion);
|
metadata.setVersion(expectedVersion);
|
||||||
metadata.setComments(comments);
|
metadata.setComments(comments);
|
||||||
|
@ -169,24 +173,29 @@ public class RestBasedFlowRegistry implements FlowRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
return (int) getRegistryClient().getFlowClient().get(bucketId, flowId).getVersionCount();
|
return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
|
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
|
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
|
||||||
final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
|
final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
|
||||||
|
|
||||||
final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
|
final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
|
||||||
for (final VersionedProcessGroup child : contents.getProcessGroups()) {
|
for (final VersionedProcessGroup child : contents.getProcessGroups()) {
|
||||||
populateVersionedContentsRecursively(child);
|
populateVersionedContentsRecursively(child, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
return flowSnapshot;
|
return flowSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException {
|
@Override
|
||||||
|
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
|
||||||
|
return getFlowContents(bucketId, flowId, version, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException {
|
||||||
if (group == null) {
|
if (group == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -205,7 +214,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
|
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
|
||||||
final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version);
|
final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user);
|
||||||
final VersionedProcessGroup contents = snapshot.getFlowContents();
|
final VersionedProcessGroup contents = snapshot.getFlowContents();
|
||||||
|
|
||||||
group.setComments(contents.getComments());
|
group.setComments(contents.getComments());
|
||||||
|
@ -222,14 +231,19 @@ public class RestBasedFlowRegistry implements FlowRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final VersionedProcessGroup child : group.getProcessGroups()) {
|
for (final VersionedProcessGroup child : group.getProcessGroups()) {
|
||||||
populateVersionedContentsRecursively(child);
|
populateVersionedContentsRecursively(child, user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
|
||||||
|
final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
|
||||||
|
return flowClient.get(bucketId, flowId);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
||||||
final FlowClient flowClient = getRegistryClient().getFlowClient();
|
final FlowClient flowClient = getRegistryClient().getFlowClient();
|
||||||
return flowClient.get(bucketId, flowId);
|
return flowClient.get(bucketId, flowId);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.apache.nifi.registry.flow;
|
package org.apache.nifi.registry.flow;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -56,14 +55,15 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
|
||||||
final String uriScheme = uri.getScheme();
|
final String uriScheme = uri.getScheme();
|
||||||
|
|
||||||
final FlowRegistry registry;
|
final FlowRegistry registry;
|
||||||
if (uriScheme.equalsIgnoreCase("file")) {
|
if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
|
||||||
try {
|
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
|
||||||
registry = new FileBasedFlowRegistry(registryId, registryUrl);
|
if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
|
||||||
} catch (IOException e) {
|
throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
|
||||||
throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl, e);
|
+ " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
|
||||||
|
+ "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
|
||||||
}
|
}
|
||||||
|
|
||||||
registry.setName(registryName);
|
registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName);
|
||||||
registry.setDescription(description);
|
registry.setDescription(description);
|
||||||
} else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
|
} else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
|
||||||
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
|
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.nifi.registry.flow;
|
package org.apache.nifi.registry.flow;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
|
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
|
||||||
|
|
||||||
|
@ -33,8 +32,8 @@ public class StandardVersionControlInformation implements VersionControlInformat
|
||||||
private volatile String flowDescription;
|
private volatile String flowDescription;
|
||||||
private final int version;
|
private final int version;
|
||||||
private volatile VersionedProcessGroup flowSnapshot;
|
private volatile VersionedProcessGroup flowSnapshot;
|
||||||
private volatile Boolean modified = null;
|
private volatile boolean modified;
|
||||||
private volatile Boolean current = null;
|
private volatile boolean current;
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
private String registryIdentifier;
|
private String registryIdentifier;
|
||||||
|
@ -89,12 +88,12 @@ public class StandardVersionControlInformation implements VersionControlInformat
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder modified(Boolean modified) {
|
public Builder modified(boolean modified) {
|
||||||
this.modified = modified;
|
this.modified = modified;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder current(Boolean current) {
|
public Builder current(boolean current) {
|
||||||
this.current = current;
|
this.current = current;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -113,8 +112,8 @@ public class StandardVersionControlInformation implements VersionControlInformat
|
||||||
.flowId(dto.getFlowId())
|
.flowId(dto.getFlowId())
|
||||||
.flowName(dto.getFlowName())
|
.flowName(dto.getFlowName())
|
||||||
.flowDescription(dto.getFlowDescription())
|
.flowDescription(dto.getFlowDescription())
|
||||||
.current(dto.getCurrent())
|
.current(dto.getCurrent() == null ? true : dto.getCurrent())
|
||||||
.modified(dto.getModified())
|
.modified(dto.getModified() == null ? false : dto.getModified())
|
||||||
.version(dto.getVersion());
|
.version(dto.getVersion());
|
||||||
|
|
||||||
return builder;
|
return builder;
|
||||||
|
@ -139,7 +138,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
|
||||||
|
|
||||||
|
|
||||||
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
|
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
|
||||||
final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) {
|
final VersionedProcessGroup snapshot, final boolean modified, final boolean current) {
|
||||||
this.registryIdentifier = registryId;
|
this.registryIdentifier = registryId;
|
||||||
this.registryName = registryName;
|
this.registryName = registryName;
|
||||||
this.bucketIdentifier = bucketId;
|
this.bucketIdentifier = bucketId;
|
||||||
|
@ -208,13 +207,13 @@ public class StandardVersionControlInformation implements VersionControlInformat
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Boolean> getModified() {
|
public boolean isModified() {
|
||||||
return Optional.ofNullable(modified);
|
return modified;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Boolean> getCurrent() {
|
public boolean isCurrent() {
|
||||||
return Optional.ofNullable(current);
|
return current;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1317,6 +1317,16 @@ public interface NiFiServiceFacade {
|
||||||
*/
|
*/
|
||||||
VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, StartVersionControlRequestEntity requestEntity);
|
VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, StartVersionControlRequestEntity requestEntity);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the specified Versioned Flow from the specified Flow Registry
|
||||||
|
*
|
||||||
|
* @param registryId the ID of the Flow Registry
|
||||||
|
* @param bucketId the ID of the bucket
|
||||||
|
* @param flowId the ID of the flow
|
||||||
|
* @return the VersionedFlow that was deleted
|
||||||
|
*/
|
||||||
|
VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId) throws IOException, NiFiRegistryException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
|
* Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
|
||||||
*
|
*
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web;
|
package org.apache.nifi.web;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import javax.ws.rs.WebApplicationException;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.nifi.action.Action;
|
import org.apache.nifi.action.Action;
|
||||||
import org.apache.nifi.action.Component;
|
import org.apache.nifi.action.Component;
|
||||||
|
@ -88,9 +90,9 @@ import org.apache.nifi.history.HistoryQuery;
|
||||||
import org.apache.nifi.history.PreviousValue;
|
import org.apache.nifi.history.PreviousValue;
|
||||||
import org.apache.nifi.registry.ComponentVariableRegistry;
|
import org.apache.nifi.registry.ComponentVariableRegistry;
|
||||||
import org.apache.nifi.registry.bucket.Bucket;
|
import org.apache.nifi.registry.bucket.Bucket;
|
||||||
|
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||||
import org.apache.nifi.registry.flow.FlowRegistry;
|
import org.apache.nifi.registry.flow.FlowRegistry;
|
||||||
import org.apache.nifi.registry.flow.FlowRegistryClient;
|
import org.apache.nifi.registry.flow.FlowRegistryClient;
|
||||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
|
||||||
import org.apache.nifi.registry.flow.VersionControlInformation;
|
import org.apache.nifi.registry.flow.VersionControlInformation;
|
||||||
import org.apache.nifi.registry.flow.VersionedComponent;
|
import org.apache.nifi.registry.flow.VersionedComponent;
|
||||||
import org.apache.nifi.registry.flow.VersionedConnection;
|
import org.apache.nifi.registry.flow.VersionedConnection;
|
||||||
|
@ -228,6 +230,7 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
|
||||||
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
||||||
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
|
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
|
||||||
import org.apache.nifi.web.api.entity.SnippetEntity;
|
import org.apache.nifi.web.api.entity.SnippetEntity;
|
||||||
|
import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
|
||||||
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
|
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
|
||||||
import org.apache.nifi.web.api.entity.TemplateEntity;
|
import org.apache.nifi.web.api.entity.TemplateEntity;
|
||||||
import org.apache.nifi.web.api.entity.TenantEntity;
|
import org.apache.nifi.web.api.entity.TenantEntity;
|
||||||
|
@ -237,7 +240,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
|
||||||
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
|
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
|
||||||
import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
|
import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
|
||||||
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
|
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
|
||||||
import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
|
|
||||||
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
|
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
|
||||||
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
|
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
|
||||||
import org.apache.nifi.web.controller.ControllerFacade;
|
import org.apache.nifi.web.controller.ControllerFacade;
|
||||||
|
@ -268,8 +270,8 @@ import org.apache.nifi.web.util.SnippetUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.ws.rs.WebApplicationException;
|
import com.google.common.collect.Sets;
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -3667,8 +3669,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
action = "add the local flow to the Flow Registry as the first Snapshot";
|
action = "add the local flow to the Flow Registry as the first Snapshot";
|
||||||
|
|
||||||
// add first snapshot to the flow in the registry
|
// add first snapshot to the flow in the registry
|
||||||
final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription();
|
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
|
||||||
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments, expectedVersion);
|
|
||||||
} catch (final NiFiRegistryException e) {
|
} catch (final NiFiRegistryException e) {
|
||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
|
@ -3676,14 +3677,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action);
|
throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Bucket bucket = registeredSnapshot.getBucket();
|
||||||
|
final VersionedFlow flow = registeredSnapshot.getFlow();
|
||||||
|
|
||||||
// Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
|
// Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
|
||||||
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
|
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
|
||||||
vci.setBucketId(registeredFlow.getBucketIdentifier());
|
vci.setBucketId(bucket.getIdentifier());
|
||||||
vci.setBucketName(registeredFlow.getBucketName());
|
vci.setBucketName(bucket.getName());
|
||||||
vci.setCurrent(true);
|
vci.setCurrent(true);
|
||||||
vci.setFlowId(registeredFlow.getIdentifier());
|
vci.setFlowId(flow.getIdentifier());
|
||||||
vci.setFlowName(registeredFlow.getName());
|
vci.setFlowName(flow.getName());
|
||||||
vci.setFlowDescription(registeredFlow.getDescription());
|
vci.setFlowDescription(flow.getDescription());
|
||||||
vci.setGroupId(groupId);
|
vci.setGroupId(groupId);
|
||||||
vci.setModified(false);
|
vci.setModified(false);
|
||||||
vci.setRegistryId(registryId);
|
vci.setRegistryId(registryId);
|
||||||
|
@ -3702,6 +3706,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
||||||
|
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
|
||||||
|
if (registry == null) {
|
||||||
|
throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
|
public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
|
||||||
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
|
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
|
||||||
|
@ -3737,8 +3751,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
}
|
}
|
||||||
|
|
||||||
final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
|
final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
|
||||||
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion());
|
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
|
||||||
|
|
||||||
|
|
||||||
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
|
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
|
||||||
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
|
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
|
||||||
|
@ -3784,7 +3797,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
|
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return registry.registerVersionedFlow(flow);
|
return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
|
||||||
}
|
}
|
||||||
|
|
||||||
private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
|
||||||
|
@ -3793,7 +3806,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
|
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return registry.getVersionedFlow(bucketId, flowId);
|
return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -3804,7 +3817,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
|
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion);
|
return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -4023,7 +4036,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
|
|
||||||
final VersionedFlowSnapshot snapshot;
|
final VersionedFlowSnapshot snapshot;
|
||||||
try {
|
try {
|
||||||
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion());
|
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
|
||||||
} catch (final NiFiRegistryException e) {
|
} catch (final NiFiRegistryException e) {
|
||||||
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
|
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
|
||||||
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
|
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
|
||||||
|
@ -4064,7 +4077,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
|
|
||||||
final VersionedFlowSnapshot childSnapshot;
|
final VersionedFlowSnapshot childSnapshot;
|
||||||
try {
|
try {
|
||||||
childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion());
|
childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser());
|
||||||
} catch (final NiFiRegistryException e) {
|
} catch (final NiFiRegistryException e) {
|
||||||
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
|
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
|
||||||
+ remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
|
+ remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
|
||||||
|
|
|
@ -87,10 +87,11 @@ import org.apache.nifi.connectable.ConnectableType;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
|
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceState;
|
import org.apache.nifi.controller.service.ControllerServiceState;
|
||||||
|
import org.apache.nifi.registry.bucket.Bucket;
|
||||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||||
import org.apache.nifi.registry.flow.FlowRegistryUtils;
|
import org.apache.nifi.registry.flow.FlowRegistryUtils;
|
||||||
|
import org.apache.nifi.registry.flow.VersionedFlow;
|
||||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
||||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
|
||||||
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
|
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
|
||||||
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
|
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
|
||||||
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
|
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
|
||||||
|
@ -1643,11 +1644,12 @@ public class ProcessGroupResource extends ApplicationResource {
|
||||||
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
|
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
|
||||||
// Step 2: Retrieve flow from Flow Registry
|
// Step 2: Retrieve flow from Flow Registry
|
||||||
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
|
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
|
||||||
|
final Bucket bucket = flowSnapshot.getBucket();
|
||||||
|
final VersionedFlow flow = flowSnapshot.getFlow();
|
||||||
|
|
||||||
final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
|
versionControlInfo.setBucketName(bucket.getName());
|
||||||
versionControlInfo.setBucketName(metadata.getBucketName());
|
versionControlInfo.setFlowName(flow.getName());
|
||||||
versionControlInfo.setFlowName(metadata.getFlowName());
|
versionControlInfo.setFlowDescription(flow.getDescription());
|
||||||
versionControlInfo.setFlowDescription(metadata.getFlowDescription());
|
|
||||||
|
|
||||||
versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
|
versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
|
||||||
|
|
||||||
|
|
|
@ -17,12 +17,37 @@
|
||||||
|
|
||||||
package org.apache.nifi.web.api;
|
package org.apache.nifi.web.api;
|
||||||
|
|
||||||
import io.swagger.annotations.Api;
|
import java.io.IOException;
|
||||||
import io.swagger.annotations.ApiOperation;
|
import java.net.URI;
|
||||||
import io.swagger.annotations.ApiParam;
|
import java.net.URISyntaxException;
|
||||||
import io.swagger.annotations.ApiResponse;
|
import java.util.Collections;
|
||||||
import io.swagger.annotations.ApiResponses;
|
import java.util.Date;
|
||||||
import io.swagger.annotations.Authorization;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import javax.ws.rs.Consumes;
|
||||||
|
import javax.ws.rs.DELETE;
|
||||||
|
import javax.ws.rs.DefaultValue;
|
||||||
|
import javax.ws.rs.GET;
|
||||||
|
import javax.ws.rs.HttpMethod;
|
||||||
|
import javax.ws.rs.POST;
|
||||||
|
import javax.ws.rs.PUT;
|
||||||
|
import javax.ws.rs.Path;
|
||||||
|
import javax.ws.rs.PathParam;
|
||||||
|
import javax.ws.rs.Produces;
|
||||||
|
import javax.ws.rs.QueryParam;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.MultivaluedHashMap;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.authorization.Authorizer;
|
import org.apache.nifi.authorization.Authorizer;
|
||||||
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
|
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
|
||||||
|
@ -34,7 +59,9 @@ import org.apache.nifi.cluster.manager.NodeResponse;
|
||||||
import org.apache.nifi.controller.FlowController;
|
import org.apache.nifi.controller.FlowController;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceState;
|
import org.apache.nifi.controller.service.ControllerServiceState;
|
||||||
|
import org.apache.nifi.registry.bucket.Bucket;
|
||||||
import org.apache.nifi.registry.flow.FlowRegistryUtils;
|
import org.apache.nifi.registry.flow.FlowRegistryUtils;
|
||||||
|
import org.apache.nifi.registry.flow.VersionedFlow;
|
||||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
||||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
|
||||||
import org.apache.nifi.registry.flow.VersionedProcessGroup;
|
import org.apache.nifi.registry.flow.VersionedProcessGroup;
|
||||||
|
@ -69,35 +96,12 @@ import org.apache.nifi.web.util.Pause;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.ws.rs.Consumes;
|
import io.swagger.annotations.Api;
|
||||||
import javax.ws.rs.DELETE;
|
import io.swagger.annotations.ApiOperation;
|
||||||
import javax.ws.rs.DefaultValue;
|
import io.swagger.annotations.ApiParam;
|
||||||
import javax.ws.rs.GET;
|
import io.swagger.annotations.ApiResponse;
|
||||||
import javax.ws.rs.HttpMethod;
|
import io.swagger.annotations.ApiResponses;
|
||||||
import javax.ws.rs.POST;
|
import io.swagger.annotations.Authorization;
|
||||||
import javax.ws.rs.PUT;
|
|
||||||
import javax.ws.rs.Path;
|
|
||||||
import javax.ws.rs.PathParam;
|
|
||||||
import javax.ws.rs.Produces;
|
|
||||||
import javax.ws.rs.QueryParam;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
import javax.ws.rs.core.MultivaluedHashMap;
|
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import javax.ws.rs.core.Response.Status;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
@Path("/versions")
|
@Path("/versions")
|
||||||
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
|
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
|
||||||
|
@ -125,9 +129,12 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Consumes(MediaType.WILDCARD)
|
@Consumes(MediaType.WILDCARD)
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("process-groups/{id}")
|
@Path("process-groups/{id}")
|
||||||
@ApiOperation(value = "Gets the Version Control information for a process group", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = {
|
@ApiOperation(value = "Gets the Version Control information for a process group",
|
||||||
@Authorization(value = "Read - /process-groups/{uuid}")
|
response = VersionControlInformationEntity.class,
|
||||||
})
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
|
authorizations = {
|
||||||
|
@Authorization(value = "Read - /process-groups/{uuid}")
|
||||||
|
})
|
||||||
@ApiResponses(value = {
|
@ApiResponses(value = {
|
||||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
||||||
|
@ -164,7 +171,9 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("start-requests")
|
@Path("start-requests")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed",
|
value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will "
|
||||||
|
+ "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A "
|
||||||
|
+ "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.",
|
||||||
response = VersionControlInformationEntity.class,
|
response = VersionControlInformationEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT)
|
notes = NON_GUARANTEED_ENDPOINT)
|
||||||
@ApiResponses(value = {
|
@ApiResponses(value = {
|
||||||
|
@ -305,7 +314,8 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("start-requests/{id}")
|
@Path("start-requests/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Deletes the request with the given ID",
|
value = "Deletes the Version Control Request with the given ID. This will allow other threads to save flows to the Flow Registry. See also the documentation "
|
||||||
|
+ "for POSTing to /versions/start-requests for information regarding why this is done.",
|
||||||
notes = NON_GUARANTEED_ENDPOINT)
|
notes = NON_GUARANTEED_ENDPOINT)
|
||||||
@ApiResponses(value = {
|
@ApiResponses(value = {
|
||||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||||
|
@ -349,7 +359,8 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("process-groups/{id}")
|
@Path("process-groups/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Begins version controlling the Process Group with the given ID",
|
value = "Begins version controlling the Process Group with the given ID or commits changes to the Versioned Flow, "
|
||||||
|
+ "depending on if the provided VersionControlInformation includes a flowId",
|
||||||
response = VersionControlInformationEntity.class,
|
response = VersionControlInformationEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -365,7 +376,7 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||||
})
|
})
|
||||||
public Response startVersionControl(
|
public Response saveToFlowRegistry(
|
||||||
@ApiParam("The process group id.") @PathParam("id") final String groupId,
|
@ApiParam("The process group id.") @PathParam("id") final String groupId,
|
||||||
@ApiParam(value = "The versioned flow details.", required = true) final StartVersionControlRequestEntity requestEntity) throws IOException {
|
@ApiParam(value = "The versioned flow details.", required = true) final StartVersionControlRequestEntity requestEntity) throws IOException {
|
||||||
|
|
||||||
|
@ -402,29 +413,7 @@ public class VersionsResource extends ApplicationResource {
|
||||||
final URI requestUri;
|
final URI requestUri;
|
||||||
try {
|
try {
|
||||||
final URI originalUri = getAbsolutePath();
|
final URI originalUri = getAbsolutePath();
|
||||||
final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
final String requestId = lockVersionControl(originalUri, groupId);
|
||||||
originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment());
|
|
||||||
|
|
||||||
final NodeResponse clusterResponse;
|
|
||||||
try {
|
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
|
||||||
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
|
||||||
} else {
|
|
||||||
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
|
||||||
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
|
||||||
}
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
|
||||||
final String errorResponse = getResponseEntity(clusterResponse, String.class);
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String requestId = getResponseEntity(clusterResponse, String.class);
|
|
||||||
|
|
||||||
requestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
requestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||||
originalUri.getPort(), "/nifi-api/versions/start-requests/" + requestId, null, originalUri.getFragment());
|
originalUri.getPort(), "/nifi-api/versions/start-requests/" + requestId, null, originalUri.getFragment());
|
||||||
|
@ -439,54 +428,12 @@ public class VersionsResource extends ApplicationResource {
|
||||||
// Finally, we can delete the Request.
|
// Finally, we can delete the Request.
|
||||||
try {
|
try {
|
||||||
final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity);
|
final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity);
|
||||||
|
replicateVersionControlMapping(mappingEntity, requestEntity, requestUri, groupId);
|
||||||
final Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("content-type", MediaType.APPLICATION_JSON);
|
|
||||||
|
|
||||||
final NodeResponse clusterResponse;
|
|
||||||
try {
|
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
|
||||||
clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
|
|
||||||
} else {
|
|
||||||
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
|
||||||
getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
|
|
||||||
}
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
|
||||||
final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + ".";
|
|
||||||
final Throwable cause = clusterResponse.getThrowable();
|
|
||||||
if (cause == null) {
|
|
||||||
throw new IllegalStateException(message);
|
|
||||||
} else {
|
|
||||||
throw new IllegalStateException(message, cause);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final VersionControlInformationEntity responseEntity = serviceFacade.getVersionControlInformation(groupId);
|
final VersionControlInformationEntity responseEntity = serviceFacade.getVersionControlInformation(groupId);
|
||||||
return generateOkResponse(responseEntity).build();
|
return generateOkResponse(responseEntity).build();
|
||||||
} finally {
|
} finally {
|
||||||
final NodeResponse clusterResponse;
|
unlockVersionControl(requestUri, groupId);
|
||||||
try {
|
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
|
||||||
clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
|
||||||
} else {
|
|
||||||
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
|
||||||
getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
|
||||||
}
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. "
|
|
||||||
+ "Users may be unable to Version Control other Process Groups until the request lock times out.", ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
|
||||||
logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. "
|
|
||||||
+ "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -560,13 +507,115 @@ public class VersionsResource extends ApplicationResource {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void unlockVersionControl(final URI requestUri, final String groupId) {
|
||||||
|
final NodeResponse clusterResponse;
|
||||||
|
try {
|
||||||
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
|
clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
||||||
|
} else {
|
||||||
|
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||||
|
getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
||||||
|
}
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. "
|
||||||
|
+ "Users may be unable to Version Control other Process Groups until the request lock times out.", ie);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
||||||
|
logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. "
|
||||||
|
+ "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException {
|
||||||
|
final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||||
|
originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment());
|
||||||
|
|
||||||
|
final NodeResponse clusterResponse;
|
||||||
|
try {
|
||||||
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
|
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
||||||
|
} else {
|
||||||
|
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||||
|
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
||||||
|
}
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
||||||
|
final String errorResponse = getResponseEntity(clusterResponse, String.class);
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String requestId = getResponseEntity(clusterResponse, String.class);
|
||||||
|
return requestId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void replicateVersionControlMapping(final VersionControlComponentMappingEntity mappingEntity, final StartVersionControlRequestEntity requestEntity, final URI requestUri,
|
||||||
|
final String groupId) {
|
||||||
|
final Map<String, String> headers = new HashMap<>();
|
||||||
|
headers.put("content-type", MediaType.APPLICATION_JSON);
|
||||||
|
|
||||||
|
final NodeResponse clusterResponse;
|
||||||
|
try {
|
||||||
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
|
clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
|
||||||
|
} else {
|
||||||
|
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||||
|
getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
|
||||||
|
}
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
|
||||||
|
if (requestEntity.getVersionedFlow().getFlowId() == null) {
|
||||||
|
// We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the
|
||||||
|
// flow from the Flow Registry (use best effort; if we can't remove it, just log and move on).
|
||||||
|
final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation();
|
||||||
|
try {
|
||||||
|
serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. "
|
||||||
|
+ "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
||||||
|
if (requestEntity.getVersionedFlow().getFlowId() == null) {
|
||||||
|
// We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the
|
||||||
|
// flow from the Flow Registry (use best effort; if we can't remove it, just log and move on).
|
||||||
|
final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation();
|
||||||
|
try {
|
||||||
|
serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. "
|
||||||
|
+ "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + ".";
|
||||||
|
final Throwable cause = clusterResponse.getThrowable();
|
||||||
|
if (cause == null) {
|
||||||
|
throw new IllegalStateException(message);
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@DELETE
|
@DELETE
|
||||||
@Consumes(MediaType.WILDCARD)
|
@Consumes(MediaType.WILDCARD)
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("process-groups/{id}")
|
@Path("process-groups/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Stops version controlling the Process Group with the given ID",
|
value = "Stops version controlling the Process Group with the given ID. The Process Group will no longer track to any Versioned Flow.",
|
||||||
response = VersionControlInformationEntity.class,
|
response = VersionControlInformationEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -626,7 +675,8 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("process-groups/{id}")
|
@Path("process-groups/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version",
|
value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version. This endpoint expects "
|
||||||
|
+ "that the given snapshot will not modify any Processor that is currently running or any Controller Service that is enabled.",
|
||||||
response = VersionControlInformationEntity.class,
|
response = VersionControlInformationEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -689,14 +739,17 @@ public class VersionsResource extends ApplicationResource {
|
||||||
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false);
|
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false);
|
||||||
},
|
},
|
||||||
(rev, entity) -> {
|
(rev, entity) -> {
|
||||||
|
final Bucket bucket = flowSnapshot.getBucket();
|
||||||
|
final VersionedFlow flow = flowSnapshot.getFlow();
|
||||||
|
|
||||||
// Update the Process Group to match the proposed flow snapshot
|
// Update the Process Group to match the proposed flow snapshot
|
||||||
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
|
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
|
||||||
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
|
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
|
||||||
versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName());
|
versionControlInfoDto.setBucketName(bucket.getName());
|
||||||
versionControlInfoDto.setCurrent(true);
|
versionControlInfoDto.setCurrent(true);
|
||||||
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
|
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
|
||||||
versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName());
|
versionControlInfoDto.setFlowName(flow.getName());
|
||||||
versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription());
|
versionControlInfoDto.setFlowDescription(flow.getDescription());
|
||||||
versionControlInfoDto.setGroupId(groupId);
|
versionControlInfoDto.setGroupId(groupId);
|
||||||
versionControlInfoDto.setModified(false);
|
versionControlInfoDto.setModified(false);
|
||||||
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
|
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
|
||||||
|
@ -720,7 +773,9 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("update-requests/{id}")
|
@Path("update-requests/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Returns the Update Request with the given ID",
|
value = "Returns the Update Request with the given ID. Once an Update Request has been created by performing a POST to /versions/update-requests/process-groups/{id}, "
|
||||||
|
+ "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
|
||||||
|
+ "current state of the request, and any failures.",
|
||||||
response = VersionedFlowUpdateRequestEntity.class,
|
response = VersionedFlowUpdateRequestEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -741,7 +796,9 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("revert-requests/{id}")
|
@Path("revert-requests/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Returns the Revert Request with the given ID",
|
value = "Returns the Revert Request with the given ID. Once a Revert Request has been created by performing a POST to /versions/revert-requests/process-groups/{id}, "
|
||||||
|
+ "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
|
||||||
|
+ "current state of the request, and any failures.",
|
||||||
response = VersionedFlowUpdateRequestEntity.class,
|
response = VersionedFlowUpdateRequestEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -795,7 +852,9 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("update-requests/{id}")
|
@Path("update-requests/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Deletes the Update Request with the given ID",
|
value = "Deletes the Update Request with the given ID. After a request is created via a POST to /versions/update-requests/process-groups/{id}, it is expected "
|
||||||
|
+ "that the client will properly clean up the request by DELETE'ing it, once the Update process has completed. If the request is deleted before the request "
|
||||||
|
+ "completes, then the Update request will finish the step that it is currently performing and then will cancel any subsequent steps.",
|
||||||
response = VersionedFlowUpdateRequestEntity.class,
|
response = VersionedFlowUpdateRequestEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -816,7 +875,9 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Path("revert-requests/{id}")
|
@Path("revert-requests/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "Deletes the Revert Request with the given ID",
|
value = "Deletes the Revert Request with the given ID. After a request is created via a POST to /versions/revert-requests/process-groups/{id}, it is expected "
|
||||||
|
+ "that the client will properly clean up the request by DELETE'ing it, once the Revert process has completed. If the request is deleted before the request "
|
||||||
|
+ "completes, then the Revert request will finish the step that it is currently performing and then will cancel any subsequent steps.",
|
||||||
response = VersionedFlowUpdateRequestEntity.class,
|
response = VersionedFlowUpdateRequestEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -881,7 +942,12 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Path("update-requests/process-groups/{id}")
|
@Path("update-requests/process-groups/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "For a Process Group that is already under Version Control, this will initiate the action of changing "
|
value = "For a Process Group that is already under Version Control, this will initiate the action of changing "
|
||||||
+ "from a specific version of the flow in the Flow Registry to a different version of the flow.",
|
+ "from a specific version of the flow in the Flow Registry to a different version of the flow. This can be a lengthy "
|
||||||
|
+ "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
|
||||||
|
+ "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur "
|
||||||
|
+ "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
|
||||||
|
+ "/versions/update-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
|
||||||
|
+ "/versions/update-requests/{requestId}.",
|
||||||
response = VersionedFlowUpdateRequestEntity.class,
|
response = VersionedFlowUpdateRequestEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -1058,8 +1124,13 @@ public class VersionsResource extends ApplicationResource {
|
||||||
@Path("revert-requests/process-groups/{id}")
|
@Path("revert-requests/process-groups/{id}")
|
||||||
@ApiOperation(
|
@ApiOperation(
|
||||||
value = "For a Process Group that is already under Version Control, this will initiate the action of reverting "
|
value = "For a Process Group that is already under Version Control, this will initiate the action of reverting "
|
||||||
+ "any changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the "
|
+ "any local changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the "
|
||||||
+ "flow matching the Versioned Flow that exists in the Flow Registry.",
|
+ "flow matching the Versioned Flow that exists in the Flow Registry. This can be a lengthy "
|
||||||
|
+ "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
|
||||||
|
+ "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur "
|
||||||
|
+ "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
|
||||||
|
+ "/versions/revert-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
|
||||||
|
+ "/versions/revert-requests/{requestId}.",
|
||||||
response = VersionedFlowUpdateRequestEntity.class,
|
response = VersionedFlowUpdateRequestEntity.class,
|
||||||
notes = NON_GUARANTEED_ENDPOINT,
|
notes = NON_GUARANTEED_ENDPOINT,
|
||||||
authorizations = {
|
authorizations = {
|
||||||
|
@ -1174,34 +1245,6 @@ public class VersionsResource extends ApplicationResource {
|
||||||
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
|
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return.
|
|
||||||
if (currentVersion.getModified() == Boolean.FALSE) {
|
|
||||||
final String requestId = UUID.randomUUID().toString();
|
|
||||||
final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete");
|
|
||||||
requestManager.submitRequest("revert-requests", requestId, request, task -> {
|
|
||||||
});
|
|
||||||
|
|
||||||
// There is nothing to do. Generate the response and send it back to the user.
|
|
||||||
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
|
|
||||||
updateRequestDto.setComplete(true);
|
|
||||||
updateRequestDto.setFailureReason(null);
|
|
||||||
updateRequestDto.setLastUpdated(new Date());
|
|
||||||
updateRequestDto.setProcessGroupId(groupId);
|
|
||||||
updateRequestDto.setRequestId(requestId);
|
|
||||||
updateRequestDto.setVersionControlInformation(currentVersion);
|
|
||||||
updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId));
|
|
||||||
updateRequestDto.setPercentCompleted(100);
|
|
||||||
updateRequestDto.setState(request.getState());
|
|
||||||
|
|
||||||
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
|
|
||||||
updateRequestEntity.setProcessGroupRevision(revisionDto);
|
|
||||||
updateRequestEntity.setRequest(updateRequestDto);
|
|
||||||
|
|
||||||
request.markComplete(currentVersionEntity);
|
|
||||||
return generateOkResponse(updateRequestEntity).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Create an asynchronous request that will occur in the background, because this request may
|
// Create an asynchronous request that will occur in the background, because this request may
|
||||||
// result in stopping components, which can take an indeterminate amount of time.
|
// result in stopping components, which can take an indeterminate amount of time.
|
||||||
final String requestId = UUID.randomUUID().toString();
|
final String requestId = UUID.randomUUID().toString();
|
||||||
|
@ -1331,7 +1374,25 @@ public class VersionsResource extends ApplicationResource {
|
||||||
// Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
|
// Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
|
||||||
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
|
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
|
||||||
final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
|
final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
|
||||||
final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation();
|
final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
|
||||||
|
|
||||||
|
final Bucket bucket = flowSnapshot.getBucket();
|
||||||
|
final VersionedFlow flow = flowSnapshot.getFlow();
|
||||||
|
|
||||||
|
final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
|
||||||
|
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
|
||||||
|
vci.setBucketId(metadata.getBucketIdentifier());
|
||||||
|
vci.setBucketName(bucket.getName());
|
||||||
|
vci.setCurrent(flowSnapshot.isLatest());
|
||||||
|
vci.setFlowDescription(flow.getDescription());
|
||||||
|
vci.setFlowId(flow.getIdentifier());
|
||||||
|
vci.setFlowName(flow.getName());
|
||||||
|
vci.setGroupId(groupId);
|
||||||
|
vci.setModified(false);
|
||||||
|
vci.setRegistryId(requestVci.getRegistryId());
|
||||||
|
vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
|
||||||
|
vci.setVersion(metadata.getVersion());
|
||||||
|
|
||||||
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
|
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,33 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web.api.dto;
|
package org.apache.nifi.web.api.dto;
|
||||||
|
|
||||||
|
import java.text.Collator;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import javax.ws.rs.WebApplicationException;
|
||||||
|
|
||||||
import org.apache.commons.lang3.ClassUtils;
|
import org.apache.commons.lang3.ClassUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.action.Action;
|
import org.apache.nifi.action.Action;
|
||||||
|
@ -188,32 +215,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
|
||||||
import org.apache.nifi.web.controller.ControllerFacade;
|
import org.apache.nifi.web.controller.ControllerFacade;
|
||||||
import org.apache.nifi.web.revision.RevisionManager;
|
import org.apache.nifi.web.revision.RevisionManager;
|
||||||
|
|
||||||
import javax.ws.rs.WebApplicationException;
|
|
||||||
import java.text.Collator;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TimeZone;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public final class DtoFactory {
|
public final class DtoFactory {
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
|
@ -2230,8 +2231,8 @@ public final class DtoFactory {
|
||||||
dto.setFlowName(versionControlInfo.getFlowName());
|
dto.setFlowName(versionControlInfo.getFlowName());
|
||||||
dto.setFlowDescription(versionControlInfo.getFlowDescription());
|
dto.setFlowDescription(versionControlInfo.getFlowDescription());
|
||||||
dto.setVersion(versionControlInfo.getVersion());
|
dto.setVersion(versionControlInfo.getVersion());
|
||||||
dto.setCurrent(versionControlInfo.getCurrent().orElse(true));
|
dto.setCurrent(versionControlInfo.isCurrent());
|
||||||
dto.setModified(versionControlInfo.getModified().orElse(false));
|
dto.setModified(versionControlInfo.isModified());
|
||||||
return dto;
|
return dto;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue