NIFI-12016: This closes #7662. Allow use of compatible NAR bundles when loading flow from cluster connection; when determining what bundles are compatible, consider not just any bundle if it's the only one but also any bundle whose version matches the framework version so that when NiFi is upgraded, it is handled more gracefully.

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-08-30 17:39:31 -04:00 committed by Joseph Witt
parent 292b5d18a0
commit f27ace1ccf
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
4 changed files with 133 additions and 122 deletions

View File

@ -20,22 +20,40 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.web.api.dto.BundleDTO;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Utility class for Bundles.
*/
public final class BundleUtils {
private static Optional<BundleCoordinate> findOptionalBundleForType(final ExtensionManager extensionManager, final String type, final BundleCoordinate desiredCoordinate) {
static Optional<BundleCoordinate> findOptionalBundleForType(final ExtensionManager extensionManager, final String type, final Bundle frameworkBundle) {
final List<Bundle> bundles = extensionManager.getBundles(type);
if (bundles.size() == 1) {
return Optional.of(bundles.get(0).getBundleDetails().getCoordinate());
}
// All NARs that are packaged with NiFi will have the same bundle coordinate as the NiFi framework bundle.
// During an upgrade, it's fairly common to have two versions of a NAR: the version shipped with NiFi and another version, perhaps to maintain
// backward compatibility to because the new version behaves some different way and the user wants the old behavior in some instances, etc.
// In this case, the user may have two versions. For example, version 2.2.0 and 2.4.0 while NiFi is at version 2.4.0.
// Now, during upgrade to 2.4.1, there will no longer be a 2.4.0 available. We want to be smart enough to realize that those extension using version
// 2.2.0 stay there but those using 2.4.0 upgrade to 2.4.1.
// To do this, we always first match on the exact version but this method is called when there's no exact match. So those marked 2.2.0 won't arrive here.
// But for those extensions that were using 2.4.0, we want to now look for version 2.4.1 - I.e., the one with the same version as the framework. If we
// find that version, then we want to use it. This helps to smooth out the upgrade process even when users have multiple versions of a given NAR.
final String frameworkVersion = frameworkBundle.getBundleDetails().getCoordinate().getVersion();
for (final Bundle bundle : bundles) {
final String componentVersion = bundle.getBundleDetails().getCoordinate().getVersion();
if (frameworkVersion.equals(componentVersion)) {
return Optional.of(bundle.getBundleDetails().getCoordinate());
}
}
return Optional.empty();
}
@ -71,7 +89,10 @@ public final class BundleUtils {
throw new IllegalStateException(String.format("%s from %s is not known to this NiFi instance.", type, coordinate));
}
} else {
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream()
.map(b -> b.getBundleDetails().getCoordinate())
.toList();
if (bundlesForType.contains(coordinate)) {
return coordinate;
} else {
@ -82,18 +103,17 @@ public final class BundleUtils {
private static Optional<BundleCoordinate> findOptionalCompatibleBundle(final ExtensionManager extensionManager, final String type,
final BundleDTO bundleDTO, final boolean allowCompatibleBundle) {
final BundleDTO bundleDTO) {
final BundleCoordinate coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion());
final Bundle bundle = extensionManager.getBundle(coordinate);
if (bundle == null) {
if (allowCompatibleBundle) {
return findOptionalBundleForType(extensionManager, type, coordinate);
} else {
return Optional.empty();
}
return findOptionalBundleForType(extensionManager, type, NarClassLoadersHolder.getInstance().getFrameworkBundle());
} else {
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream()
.map(b -> b.getBundleDetails().getCoordinate())
.toList();
if (bundlesForType.contains(coordinate)) {
return Optional.of(coordinate);
} else {
@ -181,9 +201,9 @@ public final class BundleUtils {
public static Optional<BundleCoordinate> getOptionalCompatibleBundle(final ExtensionManager extensionManager, final String type, final BundleDTO bundleDTO) {
if (bundleDTO == null) {
return findOptionalBundleForType(extensionManager, type, null);
return findOptionalBundleForType(extensionManager, type, NarClassLoadersHolder.getInstance().getFrameworkBundle());
} else {
return findOptionalCompatibleBundle(extensionManager, type, bundleDTO, true);
return findOptionalCompatibleBundle(extensionManager, type, bundleDTO);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
public class TestBundleUtils {
private static final String PROCESSOR_TYPE = "MyProcessor";
private static final String FRAMEWORK_VERSION = "5.0.0";
private static final Bundle frameworkBundle = createBundle("framework-bundle", FRAMEWORK_VERSION);
private static ExtensionManager extensionManager;
@BeforeAll
public static void setup() throws IOException, ClassNotFoundException {
extensionManager = Mockito.mock(ExtensionManager.class);
final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
narClassLoaders.init(null, new File("target/extensions"));
}
@Test
public void findOptionalBundleMatchingFramework() throws IOException, ClassNotFoundException {
final Bundle frameworkVersionBundle = createBundle("my-bundle", FRAMEWORK_VERSION);
final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
final List<Bundle> bundles = Arrays.asList(frameworkVersionBundle, otherBundle);
when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
final Optional<BundleCoordinate> compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle);
assertTrue(compatibleCoordinate.isPresent());
assertEquals(frameworkVersionBundle.getBundleDetails().getCoordinate(), compatibleCoordinate.get());
}
@Test
public void findOptionalBundleNotMatchingFramework() throws IOException, ClassNotFoundException {
final Bundle version3 = createBundle("my-bundle", "3.0.0");
final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
final List<Bundle> bundles = Arrays.asList(version3, otherBundle);
when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
final Optional<BundleCoordinate> compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle);
assertFalse(compatibleCoordinate.isPresent());
}
@Test
public void testFindOptionalBundleOnlyOneBundle() throws IOException, ClassNotFoundException {
final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
final List<Bundle> bundles = Collections.singletonList(otherBundle);
when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
final Optional<BundleCoordinate> compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle);
assertTrue(compatibleCoordinate.isPresent());
assertEquals(otherBundle.getBundleDetails().getCoordinate(), compatibleCoordinate.get());
}
private static Bundle createBundle(final String artifactId, final String version) {
final BundleDetails bundleDetails = new BundleDetails.Builder()
.coordinate(new BundleCoordinate("org.apache.nifi", artifactId, version))
.workingDir(new File("target"))
.build();
return new Bundle(bundleDetails, TestBundleUtils.class.getClassLoader());
}
}

View File

@ -956,7 +956,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setNodeId(nodeId);
// load new controller state
loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL);
loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);
// set node ID on controller before we start heartbeating because heartbeat needs node ID
clusterCoordinator.setLocalNodeIdentifier(nodeId);

View File

@ -30,7 +30,6 @@ import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@ -353,84 +352,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
});
}
@Test
public void testCannotJoinClusterIfMissingNar() throws NiFiClientException, IOException, InterruptedException {
getClientUtil().createProcessor("GenerateFlowFile");
// Shut down node 2
disconnectNode(2);
final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
node2.stop();
// Remove node from the cluster. This way we know when it's attempted to connect
final Integer node2ApiPort = getNodeApiPort(2);
removeNode(2);
removeExtensionsNar(node2);
node2.start(false);
// Wait until node is no longer removed from cluster, which will happen when it starts up and requests to connect
waitFor(() -> !isNodeRemoved(node2ApiPort));
// Wait for node to show as disconnected because it doesn't have the necessary nar
waitForNodeState(2, NodeConnectionState.DISCONNECTED);
// We need to restore the extensions nar and restart the node so that subsequent tests can succeed
restoreExtensionsNar(node2);
node2.stop();
node2.start();
waitForAllNodesConnected();
}
private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final String nodeId = nodeDto.getNodeId();
final Integer apiPort = nodeDto.getApiPort();
getNifiClient().getControllerClient().deleteNode(nodeId);
waitFor(() -> isNodeRemoved(apiPort));
}
private Integer getNodeApiPort(final int index) throws NiFiClientException, IOException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final Integer apiPort = nodeDto.getApiPort();
return apiPort;
}
@Test
public void testCanJoinClusterIfAllNodesMissingNar() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
// Shut down node 2
disconnectNode(2);
final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
node2.stop();
final NiFiInstance node1 = getNiFiInstance().getNodeInstance(1);
node1.stop();
removeExtensionsNar(node1);
removeExtensionsNar(node2);
node1.start(false);
node2.start(true);
waitForAllNodesConnected();
assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
// In order to ensure that subsequent tests are able to operate properly, we need to restore the nar and restart
node1.stop();
node2.stop();
restoreExtensionsNar(node1);
restoreExtensionsNar(node2);
node1.start(false);
node2.start(true);
waitForAllNodesConnected();
}
@Test
public void testCannotRemoveComponentsWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException {
@ -497,36 +418,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
}
private void removeExtensionsNar(final NiFiInstance nifiInstance) {
final File extensionsNar = getExtensionsNar(nifiInstance);
final File backupFile = new File(extensionsNar.getParentFile(), extensionsNar.getName() + ".backup");
assertTrue(extensionsNar.renameTo(backupFile));
}
private void restoreExtensionsNar(final NiFiInstance nifiInstance) {
final File backupFile = getExtensionsNar(nifiInstance);
final File extensionsNar = new File(backupFile.getParentFile(), backupFile.getName().replace(".backup", ""));
assertTrue(backupFile.renameTo(extensionsNar));
}
private File getExtensionsNar(final NiFiInstance nifiInstance) {
final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib");
final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));
assertEquals(1, testExtensionsNar.length);
return testExtensionsNar[0];
}
private boolean isNodeRemoved(final int apiPort) {
try {
return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
.noneMatch(dto -> dto.getApiPort() == apiPort);
} catch (Exception e) {
return false;
}
}
@Test
public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException, IOException {
restartWithOnlySingleFlowPersistenceFile("flow.json.gz");