NIFI-10918: When fetching a flow from a Flow Registry, if it references any 'internal versioned flows' instead of requiring that we have a client configured for the appropriate URL, attempt to fetch the flow from each client. We will start with the clients that do report that they can handle the URL but will try others as well. As soon as we successfully fetch the flow, we stop.

NIFI-10918: Fixed checkstyle violations

This closes #6736
Signed-off-by: Bence Simon <bsimon@apache.org>
This commit is contained in:
Mark Payne 2022-11-30 18:08:28 -05:00 committed by Bence Simon
parent c51411d360
commit 2473683ce5
2 changed files with 61 additions and 10 deletions

View File

@ -46,11 +46,15 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -58,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public final class StandardFlowRegistryClientNode extends AbstractComponentNode implements FlowRegistryClientNode {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowRegistryClientNode.class);
private final FlowManager flowManager;
private final Authorizable parent;
@ -299,11 +304,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates();
if (coordinates != null) {
final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
final String bucketId = coordinates.getBucketId();
final String flowId = coordinates.getFlowId();
final int version = coordinates.getVersion();
final RegisteredFlowSnapshot snapshot = getRegistryForInternalFlow(storageLocation).getFlowContents(context, bucketId, flowId, version, true);
final RegisteredFlowSnapshot snapshot = fetchFlowContents(context, coordinates, true);
final VersionedProcessGroup contents = snapshot.getFlowContents();
group.setVersionedFlowCoordinates(coordinates);
@ -332,14 +333,37 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
}
}
private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
return registryClientNode;
private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
final boolean fetchRemoteFlows) throws FlowRegistryException {
final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
final String bucketId = coordinates.getBucketId();
final String flowId = coordinates.getFlowId();
final int version = coordinates.getVersion();
final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
for (final FlowRegistryClientNode clientNode : clientNodes) {
try {
logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
coordinates.setRegistryId(clientNode.getIdentifier());
logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
return snapshot;
} catch (final Exception e) {
logger.debug("Failed to fetch flow", e);
}
}
throw new FlowRegistryException(String.format("No applicable registry found for storage location %s", storageLocation));
throw new FlowRegistryException(String.format("Could not find any Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version [%s] with Storage Location [%s]",
bucketId, flowId, version, storageLocation));
}
private List<FlowRegistryClientNode> getRegistryClientsForInternalFlow(final String storageLocation) {
// Sort clients based on whether or not they believe they are applicable for the given storage location
final List<FlowRegistryClientNode> matchingClients = new ArrayList<>(flowManager.getAllFlowRegistryClients());
matchingClients.sort(Comparator.comparing(client -> client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
return matchingClients;
}
private RegisteredFlowSnapshot createRegisteredFlowSnapshot(

View File

@ -72,9 +72,36 @@ public class FlowDifferenceFilters {
|| isNewRetryConfigWithDefaultValue(difference, flowManager)
|| isNewZIndexLabelConfigWithDefaultValue(difference, flowManager)
|| isNewZIndexConnectionConfigWithDefaultValue(difference, flowManager)
|| isRegistryUrlChange(difference)
|| isParameterContextChange(difference);
}
// The Registry URL may change if, for instance, a registry is moved to a new host, or is made secure, the port changes, etc.
// Since this can be handled by the client anyway, there's no need to flag this as a 'local modification'
private static boolean isRegistryUrlChange(final FlowDifference difference) {
if (difference.getDifferenceType() != DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) {
return false;
}
if (!(difference.getValueA() instanceof VersionedFlowCoordinates)) {
return false;
}
if (!(difference.getValueB() instanceof VersionedFlowCoordinates)) {
return false;
}
final VersionedFlowCoordinates coordinatesA = (VersionedFlowCoordinates) difference.getValueA();
final VersionedFlowCoordinates coordinatesB = (VersionedFlowCoordinates) difference.getValueB();
if (Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId())
&& Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId())
&& Objects.equals(coordinatesA.getVersion(), coordinatesB.getVersion())) {
return true;
}
return false;
}
/**
* Predicate that returns true if the difference is NOT a name change on a public port (i.e. VersionedPort that allows remote access).
*/