NIFI-5680 Handling trailing slashes on URLs of registry clients

This closes #3065.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Bende 2018-10-11 15:58:55 -04:00 committed by Mark Payne
parent 0f8880547f
commit 02e0a16a68
8 changed files with 229 additions and 2 deletions

View File

@ -23,9 +23,19 @@ public interface FlowRegistryClient {
FlowRegistry getFlowRegistry(String registryId);
default String getFlowRegistryId(String registryUrl) {
if (registryUrl.endsWith("/")) {
registryUrl = registryUrl.substring(0, registryUrl.length() - 1);
}
for (final String registryClientId : getRegistryIdentifiers()) {
final FlowRegistry registry = getFlowRegistry(registryClientId);
if (registry.getURL().equals(registryUrl)) {
String registryClientUrl = registry.getURL();
if (registryClientUrl.endsWith("/")) {
registryClientUrl = registryClientUrl.substring(0, registryClientUrl.length() - 1);
}
if (registryClientUrl.equals(registryUrl)) {
return registryClientId;
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFlowRegistryClient {
private FlowRegistryClient flowRegistryClient;
@Before
public void setup() {
flowRegistryClient = new MockFlowRegistryClient();
}
@Test
public void testParamWithTrailingSlash() {
flowRegistryClient.addFlowRegistry("1", "Registry 1", "http://localhost:1111", "NA");
flowRegistryClient.addFlowRegistry("2", "Registry 2", "http://localhost:2222", "NA");
flowRegistryClient.addFlowRegistry("3", "Registry 3", "http://localhost:3333", "NA");
final String flowRegistryId = flowRegistryClient.getFlowRegistryId("http://localhost:1111/");
assertNotNull(flowRegistryId);
assertEquals("1", flowRegistryId);
}
@Test
public void testClientWithTrailingSlash() {
flowRegistryClient.addFlowRegistry("1", "Registry 1", "http://localhost:1111", "NA");
flowRegistryClient.addFlowRegistry("2", "Registry 2", "http://localhost:2222/", "NA");
flowRegistryClient.addFlowRegistry("3", "Registry 3", "http://localhost:3333", "NA");
final String flowRegistryId = flowRegistryClient.getFlowRegistryId("http://localhost:2222");
assertNotNull(flowRegistryId);
assertEquals("2", flowRegistryId);
}
@Test
public void testNoTrailingSlash() {
flowRegistryClient.addFlowRegistry("1", "Registry 1", "http://localhost:1111", "NA");
flowRegistryClient.addFlowRegistry("2", "Registry 2", "http://localhost:2222", "NA");
flowRegistryClient.addFlowRegistry("3", "Registry 3", "http://localhost:3333", "NA");
final String flowRegistryId = flowRegistryClient.getFlowRegistryId("http://localhost:3333");
assertNotNull(flowRegistryId);
assertEquals("3", flowRegistryId);
}
private static class MockFlowRegistryClient implements FlowRegistryClient {
private Map<String,FlowRegistry> registryMap = new HashMap<>();
@Override
public FlowRegistry getFlowRegistry(String registryId) {
return registryMap.get(registryId);
}
@Override
public Set<String> getRegistryIdentifiers() {
return registryMap.keySet();
}
@Override
public void addFlowRegistry(FlowRegistry registry) {
registryMap.put(registry.getIdentifier(), registry);
}
@Override
public FlowRegistry addFlowRegistry(String registryId, String registryName, String registryUrl, String description) {
final FlowRegistry flowRegistry = mock(FlowRegistry.class);
when(flowRegistry.getIdentifier()).thenReturn(registryId);
when(flowRegistry.getName()).thenReturn(registryName);
when(flowRegistry.getURL()).thenReturn(registryUrl);
when(flowRegistry.getDescription()).thenReturn(description);
registryMap.put(flowRegistry.getIdentifier(), flowRegistry);
return flowRegistry;
}
@Override
public FlowRegistry removeFlowRegistry(String registryId) {
return registryMap.remove(registryId);
}
}
}

View File

@ -4323,6 +4323,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
.collect(Collectors.toCollection(HashSet::new));
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);

View File

@ -71,6 +71,9 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
throw new IllegalArgumentException("The given Registry URL is not valid: " + registryUrl);
}
// Handles case where the URI entered has a trailing slash, or includes the trailing /nifi-registry-api
final String registryBaseUrl = uri.getScheme() + "://" + uri.getHost() + ":" + uri.getPort();
final FlowRegistry registry;
if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
@ -80,7 +83,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
+ "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
}
registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName);
registry = new RestBasedFlowRegistry(this, registryId, registryBaseUrl, sslContext, registryName);
registry.setDescription(description);
} else {
throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl

View File

@ -18,6 +18,8 @@ package org.apache.nifi.util;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
@ -49,4 +51,46 @@ public class FlowDifferenceFilters {
return false;
}
public static Predicate<FlowDifference> FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES = (fd) -> {
return !isIgnorableVersionedFlowCoordinateChange(fd);
};
public static boolean isIgnorableVersionedFlowCoordinateChange(final FlowDifference fd) {
if (fd.getDifferenceType() == DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) {
final VersionedComponent componentA = fd.getComponentA();
final VersionedComponent componentB = fd.getComponentB();
if (componentA != null && componentB != null
&& componentA instanceof VersionedProcessGroup
&& componentB instanceof VersionedProcessGroup) {
final VersionedProcessGroup versionedProcessGroupA = (VersionedProcessGroup) componentA;
final VersionedProcessGroup versionedProcessGroupB = (VersionedProcessGroup) componentB;
final VersionedFlowCoordinates coordinatesA = versionedProcessGroupA.getVersionedFlowCoordinates();
final VersionedFlowCoordinates coordinatesB = versionedProcessGroupB.getVersionedFlowCoordinates();
if (coordinatesA != null && coordinatesB != null) {
String registryUrlA = coordinatesA.getRegistryUrl();
String registryUrlB = coordinatesB.getRegistryUrl();
if (registryUrlA != null && registryUrlB != null && !registryUrlA.equals(registryUrlB)) {
if (registryUrlA.endsWith("/")) {
registryUrlA = registryUrlA.substring(0, registryUrlA.length() - 1);
}
if (registryUrlB.endsWith("/")) {
registryUrlB = registryUrlB.substring(0, registryUrlB.length() - 1);
}
if (registryUrlA.equals(registryUrlB)) {
return true;
}
}
}
}
}
return false;
}
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.util;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.diff.DifferenceType;
@ -73,4 +75,52 @@ public class TestFlowDifferenceFilters {
// predicate should return true because we do want to include changes for adding a non-port
Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
}
@Test
public void testFilterIgnorableVersionedCoordinateDifferencesWithIgnorableDifference() {
VersionedFlowCoordinates coordinatesA = new VersionedFlowCoordinates();
coordinatesA.setRegistryUrl("http://localhost:18080");
VersionedProcessGroup processGroupA = new VersionedProcessGroup();
processGroupA.setVersionedFlowCoordinates(coordinatesA);
VersionedFlowCoordinates coordinatesB = new VersionedFlowCoordinates();
coordinatesB.setRegistryUrl("http://localhost:18080/");
VersionedProcessGroup processGroupB = new VersionedProcessGroup();
processGroupB.setVersionedFlowCoordinates(coordinatesB);
StandardFlowDifference flowDifference = new StandardFlowDifference(
DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED,
processGroupA, processGroupB,
coordinatesA.getRegistryUrl(), coordinatesB.getRegistryUrl(),
"");
Assert.assertFalse(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES.test(flowDifference));
}
@Test
public void testFilterIgnorableVersionedCoordinateDifferencesWithNonIgnorableDifference() {
VersionedFlowCoordinates coordinatesA = new VersionedFlowCoordinates();
coordinatesA.setRegistryUrl("http://localhost:18080");
VersionedProcessGroup processGroupA = new VersionedProcessGroup();
processGroupA.setVersionedFlowCoordinates(coordinatesA);
VersionedFlowCoordinates coordinatesB = new VersionedFlowCoordinates();
coordinatesB.setRegistryUrl("http://localhost:18080");
VersionedProcessGroup processGroupB = new VersionedProcessGroup();
processGroupB.setVersionedFlowCoordinates(coordinatesB);
StandardFlowDifference flowDifference = new StandardFlowDifference(
DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED,
processGroupA, processGroupB,
coordinatesA.getRegistryUrl(), coordinatesB.getRegistryUrl(),
"");
Assert.assertTrue(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES.test(flowDifference));
}
}

View File

@ -4012,6 +4012,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
.filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
.map(difference -> {
final VersionedComponent localComponent = difference.getComponentA();
@ -4053,6 +4054,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
continue;
}
if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) {
continue;
}
final VersionedComponent localComponent = difference.getComponentA();
if (localComponent == null) {
continue;
@ -4301,6 +4306,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
try {
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
} catch (final NiFiRegistryException | IOException e) {
logger.error(e.getMessage(), e);
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
}

View File

@ -2312,6 +2312,10 @@ public final class DtoFactory {
continue;
}
if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) {
continue;
}
final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());