NIFI-12918 Corrected Nested Versioned Flows for Stateless

- Removed unstable assertions from TestListFile

This closes #8536

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
slambrose 2024-03-19 13:56:33 -04:00 committed by exceptionfactory
parent 506ac835ad
commit 9394d063f2
No known key found for this signature in database
4 changed files with 146 additions and 6 deletions

View File

@ -313,9 +313,6 @@ public class TestListFile {
runner.setProperty(ListFile.MAX_AGE, age5);
runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles3.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude oldest and newest
runner.setProperty(ListFile.MIN_AGE, age1);

View File

@ -36,9 +36,17 @@ public class InMemoryFlowRegistry extends AbstractFlowRegistryClient implements
private final Map<FlowCoordinates, List<VersionedExternalFlow>> flowSnapshots = new ConcurrentHashMap<>();
/**
* Returns true regardless of the Flow Storage Location because this class is the only Flow Registry Client configured for Stateless operation
*
* @param context Configuration context.
* @param location The location of versioned flow to check.
*
* @return true regardless of location
*/
@Override
public boolean isStorageLocationApplicable(final FlowRegistryClientConfigurationContext context, final String location) {
return false;
return true;
}
@Override

View File

@ -31,12 +31,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RegistryUtil {
private static final Logger logger = LoggerFactory.getLogger(RegistryUtil.class);
private static final Pattern REGISTRY_URL_PATTERN = Pattern.compile("^(https?://.+?)/?nifi-registry-api.*$");
private final String registryUrl;
private NiFiRegistryClient registryClient;
private final SSLContext sslContext;
@ -46,6 +51,11 @@ public class RegistryUtil {
this.sslContext = sslContext;
}
public RegistryUtil(final NiFiRegistryClient registryClient, final String registryUrl, final SSLContext sslContext) {
this.registryClient = registryClient;
this.registryUrl = registryUrl;
this.sslContext = sslContext;
}
public VersionedFlowSnapshot getFlowByID(String bucketID, String flowID, int versionID) throws IOException, NiFiRegistryException {
if (versionID == -1) {
@ -119,6 +129,14 @@ public class RegistryUtil {
return flowSnapshot;
}
protected String getBaseRegistryUrl(final String storageLocation) {
final Matcher matcher = REGISTRY_URL_PATTERN.matcher(storageLocation);
if (matcher.matches()) {
return matcher.group(1);
} else {
return storageLocation;
}
}
private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException {
if (group == null) {
@ -127,12 +145,12 @@ public class RegistryUtil {
final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates();
if (coordinates != null) {
final String registryUrl = coordinates.getStorageLocation();
final String subRegistryUrl = getBaseRegistryUrl(coordinates.getStorageLocation());
final String bucketId = coordinates.getBucketId();
final String flowId = coordinates.getFlowId();
final int version = coordinates.getVersion();
final RegistryUtil subFlowUtil = new RegistryUtil(registryUrl, sslContext);
final RegistryUtil subFlowUtil = getSubRegistryUtil(subRegistryUrl);
final VersionedFlowSnapshot snapshot = subFlowUtil.getFlowByID(bucketId, flowId, version);
final VersionedProcessGroup contents = snapshot.getFlowContents();
@ -159,4 +177,15 @@ public class RegistryUtil {
populateVersionedContentsRecursively(child);
}
}
private RegistryUtil getSubRegistryUtil(final String subRegistryUrl) {
final RegistryUtil subRegistryUtil;
if (registryUrl.startsWith(subRegistryUrl)) {
// Share current Registry Client for matching Registry URL
subRegistryUtil = new RegistryUtil(registryClient, subRegistryUrl, sslContext);
} else {
subRegistryUtil = new RegistryUtil(subRegistryUrl, sslContext);
}
return subRegistryUtil;
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.core;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.client.FlowSnapshotClient;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestRegistryUtil {
private static final String BASE_REGISTRY_URL = "https://localhost:18443/context-path";
private static final String STORAGE_LOCATION_FORMAT = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
private static final String ROOT_BUCKET_ID = UUID.randomUUID().toString();
private static final String ROOT_FLOW_ID = UUID.randomUUID().toString();
private static final int ROOT_VERSION = 1;
private static final String CHILD_BUCKET_ID = UUID.randomUUID().toString();
private static final String CHILD_FLOW_ID = UUID.randomUUID().toString();
private static final int CHILD_VERSION = 2;
@Test
public void testGetBaseRegistryUrl() throws NiFiRegistryException, IOException {
final NiFiRegistryClient registryClient = mock(NiFiRegistryClient.class);
final RegistryUtil registryUtil = new RegistryUtil(registryClient, BASE_REGISTRY_URL, null);
final FlowSnapshotClient flowSnapshotClient = mock(FlowSnapshotClient.class);
when(registryClient.getFlowSnapshotClient()).thenReturn(flowSnapshotClient);
final VersionedProcessGroup childVersionedProcessGroup = getChildVersionedProcessGroup();
final VersionedFlowSnapshot rootSnapshot = buildRootSnapshot(Collections.singleton(childVersionedProcessGroup));
final String rootRegistryUrl = registryUtil.getBaseRegistryUrl(rootSnapshot.getFlowContents().getVersionedFlowCoordinates().getStorageLocation());
assertEquals(BASE_REGISTRY_URL, rootRegistryUrl);
final VersionedFlowSnapshot childSnapshot = new VersionedFlowSnapshot();
childSnapshot.setFlowContents(childVersionedProcessGroup);
final String childRegistryUrl = registryUtil.getBaseRegistryUrl(childVersionedProcessGroup.getVersionedFlowCoordinates().getStorageLocation());
assertEquals(BASE_REGISTRY_URL, childRegistryUrl);
when(flowSnapshotClient.get(eq(ROOT_BUCKET_ID), eq(ROOT_FLOW_ID), eq(ROOT_VERSION))).thenReturn(rootSnapshot);
when(flowSnapshotClient.get(eq(CHILD_BUCKET_ID), eq(CHILD_FLOW_ID), eq(CHILD_VERSION))).thenReturn(childSnapshot);
final VersionedFlowSnapshot flowSnapshot = registryUtil.getFlowContents(ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION, true, null);
assertEquals(rootSnapshot, flowSnapshot);
}
private VersionedFlowSnapshot buildRootSnapshot(final Set<VersionedProcessGroup> childGroups){
final String storageLocation = String.format(STORAGE_LOCATION_FORMAT, BASE_REGISTRY_URL, ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION);
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
coordinates.setStorageLocation(storageLocation);
coordinates.setBucketId(ROOT_BUCKET_ID);
coordinates.setFlowId(ROOT_FLOW_ID);
coordinates.setVersion(ROOT_VERSION);
final VersionedProcessGroup group = new VersionedProcessGroup();
group.setVersionedFlowCoordinates(coordinates);
group.setProcessGroups(childGroups);
final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
snapshot.setFlowContents(group);
return snapshot;
}
private VersionedProcessGroup getChildVersionedProcessGroup() {
final String storageLocation = String.format(STORAGE_LOCATION_FORMAT, BASE_REGISTRY_URL, CHILD_BUCKET_ID, CHILD_FLOW_ID, CHILD_VERSION);
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
coordinates.setStorageLocation(storageLocation);
coordinates.setBucketId(CHILD_BUCKET_ID);
coordinates.setFlowId(CHILD_FLOW_ID);
coordinates.setVersion(CHILD_VERSION);
final VersionedProcessGroup group = new VersionedProcessGroup();
group.setVersionedFlowCoordinates(coordinates);
return group;
}
}