mirror of https://github.com/apache/nifi.git
NIFI-13795 Improve NAR Manager handling of dependent NARs (#9310)
- Ensure NARs are loaded in parent-first order during restart - Allow replacing a NAR when other NARs are dependent on it - Improvements to catch Throwable in several places to handle NoClassDefFoundError Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
f39cce5dec
commit
ee29562517
|
@ -41,6 +41,11 @@ public class NarPersistenceInfo {
|
||||||
return narProperties;
|
return narProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "%s:%s:%s".formatted(narProperties.getNarGroup(), narProperties.getNarId(), narProperties.getNarVersion());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(final Object o) {
|
public boolean equals(final Object o) {
|
||||||
if (this == o) {
|
if (this == o) {
|
||||||
|
|
|
@ -55,7 +55,7 @@ public interface NarManager {
|
||||||
* @param coordinate the coordinate of the NAR
|
* @param coordinate the coordinate of the NAR
|
||||||
* @param narState the new state
|
* @param narState the new state
|
||||||
*/
|
*/
|
||||||
void updateState(BundleCoordinate coordinate, NarState narState);
|
void updateState(BundleCoordinate coordinate, NarState narState, String failureMessage);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return all NARs contained in the NAR Manager
|
* @return all NARs contained in the NAR Manager
|
||||||
|
|
|
@ -440,10 +440,10 @@ public class ExtensionBuilder {
|
||||||
final StandardLoggingContext loggingContext = new StandardLoggingContext(null);
|
final StandardLoggingContext loggingContext = new StandardLoggingContext(null);
|
||||||
try {
|
try {
|
||||||
return createControllerServiceNode(loggingContext);
|
return createControllerServiceNode(loggingContext);
|
||||||
} catch (final Exception e) {
|
} catch (final Throwable t) {
|
||||||
logger.error("Could not create Controller Service of type {} from {} for ID {} due to: {}; creating \"Ghost\" implementation", type, bundleCoordinate, identifier, e.getMessage());
|
logger.error("Could not create Controller Service of type {} from {} for ID {} due to: {}; creating \"Ghost\" implementation", type, bundleCoordinate, identifier, t.getMessage());
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug(e.getMessage(), e);
|
logger.debug(t.getMessage(), t);
|
||||||
}
|
}
|
||||||
|
|
||||||
return createGhostControllerServiceNode();
|
return createGhostControllerServiceNode();
|
||||||
|
@ -790,8 +790,8 @@ public class ExtensionBuilder {
|
||||||
verifyControllerServiceReferences(processor, bundle.getClassLoader());
|
verifyControllerServiceReferences(processor, bundle.getClassLoader());
|
||||||
|
|
||||||
return processorComponent;
|
return processorComponent;
|
||||||
} catch (final Exception e) {
|
} catch (final Throwable t) {
|
||||||
throw new ProcessorInstantiationException(type, e);
|
throw new ProcessorInstantiationException(type, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -810,8 +810,8 @@ public class ExtensionBuilder {
|
||||||
verifyControllerServiceReferences(taskComponent.getComponent(), bundle.getClassLoader());
|
verifyControllerServiceReferences(taskComponent.getComponent(), bundle.getClassLoader());
|
||||||
|
|
||||||
return taskComponent;
|
return taskComponent;
|
||||||
} catch (final Exception e) {
|
} catch (final Throwable t) {
|
||||||
throw new ReportingTaskInstantiationException(type, e);
|
throw new ReportingTaskInstantiationException(type, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,8 +825,8 @@ public class ExtensionBuilder {
|
||||||
loggableComponent.getComponent().initialize(config);
|
loggableComponent.getComponent().initialize(config);
|
||||||
|
|
||||||
return loggableComponent;
|
return loggableComponent;
|
||||||
} catch (final Exception e) {
|
} catch (final Throwable t) {
|
||||||
throw new FlowAnalysisRuleInstantiationException(type, e);
|
throw new FlowAnalysisRuleInstantiationException(type, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -860,8 +860,8 @@ public class ExtensionBuilder {
|
||||||
return clientComponent;
|
return clientComponent;
|
||||||
|
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Throwable t) {
|
||||||
throw new FlowRepositoryClientInstantiationException(type, e);
|
throw new FlowRepositoryClientInstantiationException(type, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -879,8 +879,8 @@ public class ExtensionBuilder {
|
||||||
verifyControllerServiceReferences(providerComponent.getComponent(), bundle.getClassLoader());
|
verifyControllerServiceReferences(providerComponent.getComponent(), bundle.getClassLoader());
|
||||||
|
|
||||||
return providerComponent;
|
return providerComponent;
|
||||||
} catch (final Exception e) {
|
} catch (final Throwable t) {
|
||||||
throw new ParameterProviderInstantiationException(type, e);
|
throw new ParameterProviderInstantiationException(type, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,13 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -77,49 +82,69 @@ public class NarInstallTask implements Runnable {
|
||||||
try {
|
try {
|
||||||
// If replacing an existing NAR with the same coordinate, then unload the existing NAR and stop+ghost any components from it
|
// If replacing an existing NAR with the same coordinate, then unload the existing NAR and stop+ghost any components from it
|
||||||
// If the NAR being replaced contains Python extensions, those need to be included through a separate lookup since their bundle coordinate is a logical python coordinate
|
// If the NAR being replaced contains Python extensions, those need to be included through a separate lookup since their bundle coordinate is a logical python coordinate
|
||||||
|
final List<File> narsToLoad = new ArrayList<>();
|
||||||
final StandardStoppedComponents stoppedComponents = new StandardStoppedComponents(controllerServiceProvider);
|
final StandardStoppedComponents stoppedComponents = new StandardStoppedComponents(controllerServiceProvider);
|
||||||
final Bundle existingBundle = extensionManager.getBundle(coordinate);
|
final Bundle existingBundle = extensionManager.getBundle(coordinate);
|
||||||
if (existingBundle == null) {
|
if (existingBundle == null) {
|
||||||
LOGGER.info("Installing NAR [{}] with coordinate [{}]", narNode.getIdentifier(), coordinate);
|
LOGGER.info("Installing NAR [{}] with coordinate [{}]", narNode.getIdentifier(), coordinate);
|
||||||
|
final List<File> unloadedNarFiles = unloadNarChain(coordinate, stoppedComponents, false);
|
||||||
|
narsToLoad.addAll(unloadedNarFiles);
|
||||||
|
narsToLoad.add(narFile);
|
||||||
} else {
|
} else {
|
||||||
LOGGER.info("Replacing NAR [{}], unloading existing NAR and components", coordinate);
|
LOGGER.info("Replacing NAR [{}], unloading existing NAR and components", coordinate);
|
||||||
final Set<ExtensionDefinition> extensionDefinitions = new HashSet<>(extensionManager.getTypes(coordinate));
|
final List<File> unloadedNarFiles = unloadNarChain(coordinate, stoppedComponents, true);
|
||||||
extensionDefinitions.addAll(extensionManager.getPythonExtensions(coordinate));
|
narsToLoad.addAll(unloadedNarFiles);
|
||||||
|
|
||||||
narLoader.unload(existingBundle);
|
|
||||||
narComponentManager.unloadComponents(coordinate, extensionDefinitions, stoppedComponents);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to load the NAR which will include any NARs that were previously skipped
|
// Attempt to load the NAR and any NARs that were unloaded, or need to be reloaded, and the NAR loader will also include any NARs that were previously skipped
|
||||||
final NarLoadResult narLoadResult = narLoader.load(Collections.singleton(narFile), ALLOWED_EXTENSION_TYPES);
|
// Unloading was done in child first order, so we need to reverse the list to load in parent first order
|
||||||
|
Collections.reverse(narsToLoad);
|
||||||
|
LOGGER.info("Install task for [{}] will load NARs {}", coordinate, narsToLoad);
|
||||||
|
final NarLoadResult narLoadResult = narLoader.load(narsToLoad, ALLOWED_EXTENSION_TYPES);
|
||||||
|
|
||||||
// For any successfully loaded NARs, un-ghost any components that can be provided by one of the loaded NARs, this handles a general ghosting case where
|
// For any successfully loaded NARs, un-ghost any components that can be provided by one of the loaded NARs, this handles a general ghosting case where
|
||||||
// the NAR now becomes available, as well as restoring any component that may have been purposely unloaded above for replacing an existing NAR
|
// the NAR now becomes available, as well as restoring any component that may have been purposely unloaded above for replacing an existing NAR
|
||||||
for (final Bundle loadedBundle : narLoadResult.getLoadedBundles()) {
|
for (final Bundle loadedBundle : narLoadResult.getLoadedBundles()) {
|
||||||
|
boolean installed = false;
|
||||||
final BundleCoordinate loadedCoordinate = loadedBundle.getBundleDetails().getCoordinate();
|
final BundleCoordinate loadedCoordinate = loadedBundle.getBundleDetails().getCoordinate();
|
||||||
LOGGER.info("NAR [{}] was installed", loadedCoordinate);
|
LOGGER.info("NAR [{}] was loaded", loadedCoordinate);
|
||||||
if (loadedCoordinate.equals(coordinate)) {
|
if (loadedCoordinate.equals(coordinate)) {
|
||||||
// If the NAR that was just uploaded was successfully loaded, attempt to access the class of each extension to prove that each
|
// If the NAR that was just uploaded was successfully loaded, attempt to access the class of each extension to prove that each
|
||||||
// class can load successfully, if not then we want to bounce out to the catch block and set the state as FAILED
|
// class can load successfully, if not then we want to set the state as FAILED and capture the error message
|
||||||
final Set<ExtensionDefinition> loadedExtensionDefinitions = extensionManager.getTypes(coordinate);
|
try {
|
||||||
for (final ExtensionDefinition loadedExtensionDefinition : loadedExtensionDefinitions) {
|
verifyExtensionDefinitions(coordinate);
|
||||||
final Class<?> extensionClass = extensionManager.getClass(loadedExtensionDefinition);
|
narNode.setState(NarState.INSTALLED);
|
||||||
LOGGER.debug("Loaded [{}] from bundle [{}]", extensionClass.getCanonicalName(), coordinate);
|
installed = true;
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
LOGGER.error("Failed to install NAR [{}]", coordinate, t);
|
||||||
|
narNode.setState(NarState.FAILED);
|
||||||
|
narNode.setFailureMessage(t.getMessage());
|
||||||
}
|
}
|
||||||
narNode.setState(NarState.INSTALLED);
|
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
narManager.updateState(loadedCoordinate, NarState.INSTALLED);
|
verifyExtensionDefinitions(loadedCoordinate);
|
||||||
|
narManager.updateState(loadedCoordinate, NarState.INSTALLED, null);
|
||||||
|
installed = true;
|
||||||
} catch (final NarNotFoundException e) {
|
} catch (final NarNotFoundException e) {
|
||||||
LOGGER.warn("NAR [{}] was loaded, but no longer exists in the NAR Manager", loadedCoordinate);
|
LOGGER.warn("NAR [{}] was loaded, but no longer exists in the NAR Manager", loadedCoordinate);
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
LOGGER.error("Failed to install NAR [{}]", coordinate, t);
|
||||||
|
narManager.updateState(loadedCoordinate, NarState.FAILED, t.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<ExtensionDefinition> extensionDefinitions = new HashSet<>(extensionManager.getTypes(loadedCoordinate));
|
if (installed) {
|
||||||
extensionDefinitions.addAll(extensionManager.getPythonExtensions(loadedCoordinate));
|
try {
|
||||||
narComponentManager.loadMissingComponents(loadedCoordinate, extensionDefinitions, stoppedComponents);
|
final Set<ExtensionDefinition> extensionDefinitions = new HashSet<>(extensionManager.getTypes(loadedCoordinate));
|
||||||
|
extensionDefinitions.addAll(extensionManager.getPythonExtensions(loadedCoordinate));
|
||||||
|
narComponentManager.loadMissingComponents(loadedCoordinate, extensionDefinitions, stoppedComponents);
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
LOGGER.error("Failed to load missing components from [{}]", loadedCoordinate, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process the skipped bundles and mark them as having a missing dependency
|
||||||
for (final BundleDetails skippedBundles : narLoadResult.getSkippedBundles()) {
|
for (final BundleDetails skippedBundles : narLoadResult.getSkippedBundles()) {
|
||||||
final BundleCoordinate skippedCoordinate = skippedBundles.getCoordinate();
|
final BundleCoordinate skippedCoordinate = skippedBundles.getCoordinate();
|
||||||
LOGGER.info("NAR [{}] is missing dependency", skippedCoordinate);
|
LOGGER.info("NAR [{}] is missing dependency", skippedCoordinate);
|
||||||
|
@ -127,7 +152,7 @@ public class NarInstallTask implements Runnable {
|
||||||
narNode.setState(NarState.MISSING_DEPENDENCY);
|
narNode.setState(NarState.MISSING_DEPENDENCY);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
narManager.updateState(skippedCoordinate, NarState.MISSING_DEPENDENCY);
|
narManager.updateState(skippedCoordinate, NarState.MISSING_DEPENDENCY, null);
|
||||||
} catch (final NarNotFoundException e) {
|
} catch (final NarNotFoundException e) {
|
||||||
LOGGER.warn("NAR [{}] was skipped, but no longer exists in the NAR Manager", skippedCoordinate);
|
LOGGER.warn("NAR [{}] was skipped, but no longer exists in the NAR Manager", skippedCoordinate);
|
||||||
}
|
}
|
||||||
|
@ -137,7 +162,7 @@ public class NarInstallTask implements Runnable {
|
||||||
// Restore previously running/enabled components to their original state
|
// Restore previously running/enabled components to their original state
|
||||||
stoppedComponents.startAll();
|
stoppedComponents.startAll();
|
||||||
|
|
||||||
// Notify the NAR Manager that the install task complete for the current NAR
|
// Notify the NAR Manager that the install task completed for the current NAR
|
||||||
narManager.completeInstall(narNode.getIdentifier());
|
narManager.completeInstall(narNode.getIdentifier());
|
||||||
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
|
@ -147,6 +172,84 @@ public class NarInstallTask implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyExtensionDefinitions(final BundleCoordinate coordinate) {
|
||||||
|
final Set<ExtensionDefinition> loadedExtensionDefinitions = extensionManager.getTypes(coordinate);
|
||||||
|
for (final ExtensionDefinition loadedExtensionDefinition : loadedExtensionDefinitions) {
|
||||||
|
final Class<?> extensionClass = extensionManager.getClass(loadedExtensionDefinition);
|
||||||
|
LOGGER.debug("Loaded [{}] from bundle [{}]", extensionClass.getCanonicalName(), coordinate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<File> unloadNarChain(final BundleCoordinate coordinate, final StoppedComponents stoppedComponents, final boolean includeStartingNar) {
|
||||||
|
final Map<BundleCoordinate, File> narFilesByCoordinate = new HashMap<>();
|
||||||
|
narManager.getNars().forEach(narNode -> narFilesByCoordinate.put(narNode.getManifest().getCoordinate(), narNode.getNarFile()));
|
||||||
|
|
||||||
|
final List<File> unloadedNarFiles = new ArrayList<>();
|
||||||
|
final List<Bundle> bundlesToUnload = new ArrayList<>();
|
||||||
|
final Map<BundleCoordinate, Set<ExtensionDefinition>> extensionsByCoordinate = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
final List<Bundle> bundleChain = getDependencyChain(coordinate, includeStartingNar);
|
||||||
|
for (final Bundle chainedBundle : bundleChain) {
|
||||||
|
final BundleCoordinate chainedCoordinate = chainedBundle.getBundleDetails().getCoordinate();
|
||||||
|
final File chainedNarFile = narFilesByCoordinate.get(chainedCoordinate);
|
||||||
|
if (chainedNarFile != null) {
|
||||||
|
final Set<ExtensionDefinition> extensionDefinitions = new HashSet<>(extensionManager.getTypes(chainedCoordinate));
|
||||||
|
extensionDefinitions.addAll(extensionManager.getPythonExtensions(chainedCoordinate));
|
||||||
|
extensionsByCoordinate.put(chainedCoordinate, extensionDefinitions);
|
||||||
|
|
||||||
|
bundlesToUnload.add(chainedBundle);
|
||||||
|
unloadedNarFiles.add(chainedNarFile);
|
||||||
|
} else {
|
||||||
|
LOGGER.warn("Found NAR [{}] in dependency chain of [{}], but it is not present in the NAR Manager", chainedCoordinate, coordinate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug("Unloading NAR chain from [{}] requires unloading {}", coordinate, bundlesToUnload);
|
||||||
|
narLoader.unload(bundlesToUnload);
|
||||||
|
|
||||||
|
for (Map.Entry<BundleCoordinate, Set<ExtensionDefinition>> entry : extensionsByCoordinate.entrySet()) {
|
||||||
|
narComponentManager.unloadComponents(entry.getKey(), entry.getValue(), stoppedComponents);
|
||||||
|
}
|
||||||
|
|
||||||
|
return unloadedNarFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the depth-first list of dependents from this bundle, including the starting bundle.
|
||||||
|
*
|
||||||
|
* If bundle A is dependent on B which is dependent on C, then...
|
||||||
|
* - getDependencyChain(A) returns [A]
|
||||||
|
* - getDependencyChain(B) returns [A, B]
|
||||||
|
* - getDependencyChain(C) returns [A, B, C]
|
||||||
|
*
|
||||||
|
* @param startingCoordinate the coordinate to start building the chain from
|
||||||
|
* @param includeStartingBundle indicates if the bundle for the starting coordinate should be included in the chain
|
||||||
|
* @return the depth-first list of dependents
|
||||||
|
*/
|
||||||
|
private List<Bundle> getDependencyChain(final BundleCoordinate startingCoordinate, final boolean includeStartingBundle) {
|
||||||
|
final List<Bundle> dependencyChain = new ArrayList<>();
|
||||||
|
traverseDependencyChain(startingCoordinate, dependencyChain);
|
||||||
|
if (includeStartingBundle) {
|
||||||
|
final Bundle startingBundle = extensionManager.getBundle(startingCoordinate);
|
||||||
|
if (startingBundle != null) {
|
||||||
|
dependencyChain.add(startingBundle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dependencyChain;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void traverseDependencyChain(final BundleCoordinate currentCoordinate, final List<Bundle> dependencyChain) {
|
||||||
|
final Set<Bundle> dependentBundles = extensionManager.getDependentBundles(currentCoordinate);
|
||||||
|
if (dependentBundles.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (final Bundle dependentBundle : dependentBundles) {
|
||||||
|
final BundleCoordinate dependentCoordinate = dependentBundle.getBundleDetails().getCoordinate();
|
||||||
|
traverseDependencyChain(dependentCoordinate, dependencyChain);
|
||||||
|
dependencyChain.add(dependentBundle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static Builder builder() {
|
public static Builder builder() {
|
||||||
return new Builder();
|
return new Builder();
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,17 +45,19 @@ import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.HexFormat;
|
import java.util.HexFormat;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Stack;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
|
@ -105,8 +107,8 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void afterPropertiesSet() throws IOException {
|
public void afterPropertiesSet() throws IOException {
|
||||||
final Collection<NarPersistenceInfo> narInfos = loadExistingNars();
|
final Collection<NarPersistenceInfo> narInfos = getExistingNarInfos();
|
||||||
restoreState(narInfos);
|
restoreNarNodes(narInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,12 +141,13 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateState(final BundleCoordinate coordinate, final NarState narState) {
|
public synchronized void updateState(final BundleCoordinate coordinate, final NarState narState, final String failureMessage) {
|
||||||
final NarNode narNode = narNodesById.values().stream()
|
final NarNode narNode = narNodesById.values().stream()
|
||||||
.filter(n -> n.getManifest().getCoordinate().equals(coordinate))
|
.filter(n -> n.getManifest().getCoordinate().equals(coordinate))
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.orElseThrow(() -> new NarNotFoundException(coordinate));
|
.orElseThrow(() -> new NarNotFoundException(coordinate));
|
||||||
narNode.setState(narState);
|
narNode.setState(narState);
|
||||||
|
narNode.setFailureMessage(failureMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -344,49 +347,75 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<NarPersistenceInfo> loadExistingNars() throws IOException {
|
private Collection<NarPersistenceInfo> getExistingNarInfos() throws IOException {
|
||||||
final Collection<NarPersistenceInfo> narInfos = persistenceProvider.getAllNarInfo();
|
final Collection<NarPersistenceInfo> narInfos = persistenceProvider.getAllNarInfo();
|
||||||
logger.info("Loading stored NAR files [{}]", narInfos.size());
|
logger.info("Loading stored NAR files [{}]", narInfos.size());
|
||||||
|
if (narInfos.isEmpty()) {
|
||||||
|
return narInfos;
|
||||||
|
}
|
||||||
|
|
||||||
final Collection<File> narFiles = narInfos.stream()
|
logger.debug("NAR Infos before reordering: {}", narInfos);
|
||||||
.map(NarPersistenceInfo::getNarFile)
|
|
||||||
.collect(Collectors.toSet());
|
// If any NARs being loaded are parents of other NARs being loaded, we need to ensure parents load before children, otherwise a child NAR may
|
||||||
narLoader.load(narFiles);
|
// select a compatible parent NAR from the other NARs provided by NiFi, rather than selecting the parent NAR from within the NAR Manager
|
||||||
return narInfos;
|
final int numNarInfos = narInfos.size();
|
||||||
|
final Stack<NarPersistenceInfo> narHierarchy = new Stack<>();
|
||||||
|
createNarHierarchy(narInfos, narHierarchy);
|
||||||
|
|
||||||
|
// Create a new list and add the layers of the Stack from top to bottom which gets parents ahead of children
|
||||||
|
final List<NarPersistenceInfo> orderedNarInfos = new ArrayList<>(numNarInfos);
|
||||||
|
while (!narHierarchy.isEmpty()) {
|
||||||
|
final NarPersistenceInfo narInfoToAdd = narHierarchy.pop();
|
||||||
|
orderedNarInfos.add(narInfoToAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug("NAR Infos after reordering: {}", orderedNarInfos);
|
||||||
|
return orderedNarInfos;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void restoreState(final Collection<NarPersistenceInfo> narInfos) {
|
private void createNarHierarchy(final Collection<NarPersistenceInfo> narInfos, final Stack<NarPersistenceInfo> narHierarchy) {
|
||||||
|
if (narInfos == null || narInfos.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create lookup from coordinate to NAR info
|
||||||
|
final Map<BundleCoordinate, NarPersistenceInfo> narInfosByBundleCoordinate = new HashMap<>();
|
||||||
|
narInfos.forEach(narInfo -> narInfosByBundleCoordinate.put(narInfo.getNarProperties().getCoordinate(), narInfo));
|
||||||
|
|
||||||
|
// Determine all the NARs that are parents of other NARs
|
||||||
|
final Set<NarPersistenceInfo> parentNarInfos = new HashSet<>();
|
||||||
|
for (final NarPersistenceInfo narInfo : narInfos) {
|
||||||
|
narInfo.getNarProperties().getDependencyCoordinate().ifPresent(parentCoordinate -> {
|
||||||
|
final NarPersistenceInfo parentNarInfo = narInfosByBundleCoordinate.get(parentCoordinate);
|
||||||
|
if (parentNarInfo != null) {
|
||||||
|
parentNarInfos.add(parentNarInfo);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the parent NARs from the current collection, and recurse on the parents
|
||||||
|
narInfos.removeAll(parentNarInfos);
|
||||||
|
|
||||||
|
// Add the remaining non-parent NARs to the hierarchy
|
||||||
|
narInfos.forEach(narHierarchy::push);
|
||||||
|
|
||||||
|
// Recurse on the parents NARs to further re-order them
|
||||||
|
createNarHierarchy(parentNarInfos, narHierarchy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void restoreNarNodes(final Collection<NarPersistenceInfo> narInfos) {
|
||||||
for (final NarPersistenceInfo narInfo : narInfos) {
|
for (final NarPersistenceInfo narInfo : narInfos) {
|
||||||
try {
|
try {
|
||||||
final NarNode narNode = restoreNarNode(narInfo);
|
final File narFile = narInfo.getNarFile();
|
||||||
narNodesById.put(narNode.getIdentifier(), narNode);
|
final NarManifest manifest = NarManifest.fromNarFile(narFile);
|
||||||
logger.debug("Restored NAR [{}] with state [{}] and identifier [{}]",
|
final NarNode narNode = installNar(narInfo, manifest, false);
|
||||||
narNode.getManifest().getCoordinate(), narNode.getState(), narNode.getIdentifier());
|
logger.debug("Restored NAR [{}] with state [{}] and identifier [{}]", narNode.getManifest().getCoordinate(), narNode.getState(), narNode.getIdentifier());
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.warn("Failed to restore NAR for [{}]", narInfo.getNarFile().getAbsolutePath(), e);
|
logger.warn("Failed to restore NAR for [{}]", narInfo.getNarFile().getAbsolutePath(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private NarNode restoreNarNode(final NarPersistenceInfo narInfo) throws IOException {
|
|
||||||
final File narFile = narInfo.getNarFile();
|
|
||||||
final NarManifest manifest = NarManifest.fromNarFile(narFile);
|
|
||||||
final BundleCoordinate coordinate = manifest.getCoordinate();
|
|
||||||
final String identifier = createIdentifier(coordinate);
|
|
||||||
final NarState state = determineNarState(manifest);
|
|
||||||
final String narDigest = computeNarDigest(narFile);
|
|
||||||
|
|
||||||
return NarNode.builder()
|
|
||||||
.identifier(identifier)
|
|
||||||
.narFile(narFile)
|
|
||||||
.narFileDigest(narDigest)
|
|
||||||
.manifest(manifest)
|
|
||||||
.source(NarSource.valueOf(narInfo.getNarProperties().getSourceType()))
|
|
||||||
.sourceIdentifier(narInfo.getNarProperties().getSourceId())
|
|
||||||
.state(state)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private NarNode installNar(final NarInstallRequest installRequest, final boolean async) throws IOException {
|
private NarNode installNar(final NarInstallRequest installRequest, final boolean async) throws IOException {
|
||||||
final InputStream inputStream = installRequest.getInputStream();
|
final InputStream inputStream = installRequest.getInputStream();
|
||||||
final File tempNarFile = persistenceProvider.createTempFile(inputStream);
|
final File tempNarFile = persistenceProvider.createTempFile(inputStream);
|
||||||
|
@ -410,11 +439,6 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
throw new IllegalStateException("A NAR is already registered with the same coordinate [%s], and can not be replaced because it is not part of the NAR Manager".formatted(coordinate));
|
throw new IllegalStateException("A NAR is already registered with the same coordinate [%s], and can not be replaced because it is not part of the NAR Manager".formatted(coordinate));
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<Bundle> bundlesWithMatchingDependency = extensionManager.getDependentBundles(coordinate);
|
|
||||||
if (!bundlesWithMatchingDependency.isEmpty()) {
|
|
||||||
throw new IllegalStateException("Unable to replace NAR [%s] because it is a dependency of other NARs".formatted(coordinate));
|
|
||||||
}
|
|
||||||
|
|
||||||
final NarPersistenceContext persistenceContext = NarPersistenceContext.builder()
|
final NarPersistenceContext persistenceContext = NarPersistenceContext.builder()
|
||||||
.manifest(manifest)
|
.manifest(manifest)
|
||||||
.source(installRequest.getSource())
|
.source(installRequest.getSource())
|
||||||
|
@ -423,8 +447,12 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final NarPersistenceInfo narPersistenceInfo = persistenceProvider.saveNar(persistenceContext, tempNarFile);
|
final NarPersistenceInfo narPersistenceInfo = persistenceProvider.saveNar(persistenceContext, tempNarFile);
|
||||||
|
return installNar(narPersistenceInfo, manifest, async);
|
||||||
|
}
|
||||||
|
|
||||||
|
private NarNode installNar(final NarPersistenceInfo narPersistenceInfo, final NarManifest manifest, final boolean async) throws IOException {
|
||||||
final File narFile = narPersistenceInfo.getNarFile();
|
final File narFile = narPersistenceInfo.getNarFile();
|
||||||
|
final BundleCoordinate coordinate = narPersistenceInfo.getNarProperties().getCoordinate();
|
||||||
final String identifier = createIdentifier(coordinate);
|
final String identifier = createIdentifier(coordinate);
|
||||||
final String narDigest = computeNarDigest(narFile);
|
final String narDigest = computeNarDigest(narFile);
|
||||||
|
|
||||||
|
@ -433,8 +461,8 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
.narFile(narFile)
|
.narFile(narFile)
|
||||||
.narFileDigest(narDigest)
|
.narFileDigest(narDigest)
|
||||||
.manifest(manifest)
|
.manifest(manifest)
|
||||||
.source(installRequest.getSource())
|
.source(NarSource.valueOf(narPersistenceInfo.getNarProperties().getSourceType()))
|
||||||
.sourceIdentifier(installRequest.getSourceIdentifier())
|
.sourceIdentifier(narPersistenceInfo.getNarProperties().getSourceId())
|
||||||
.state(NarState.WAITING_TO_INSTALL)
|
.state(NarState.WAITING_TO_INSTALL)
|
||||||
.build();
|
.build();
|
||||||
narNodesById.put(identifier, narNode);
|
narNodesById.put(identifier, narNode);
|
||||||
|
@ -474,20 +502,6 @@ public class StandardNarManager implements NarManager, InitializingBean, Closeab
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private NarState determineNarState(final NarManifest manifest) {
|
|
||||||
final BundleCoordinate coordinate = manifest.getCoordinate();
|
|
||||||
if (extensionManager.getBundle(coordinate) != null) {
|
|
||||||
return NarState.INSTALLED;
|
|
||||||
}
|
|
||||||
|
|
||||||
final BundleCoordinate dependencyCoordinate = manifest.getDependencyCoordinate();
|
|
||||||
if (dependencyCoordinate != null && extensionManager.getBundle(dependencyCoordinate) == null) {
|
|
||||||
return NarState.MISSING_DEPENDENCY;
|
|
||||||
}
|
|
||||||
|
|
||||||
return NarState.FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String createIdentifier(final BundleCoordinate coordinate) {
|
private String createIdentifier(final BundleCoordinate coordinate) {
|
||||||
return UUID.nameUUIDFromBytes(coordinate.getCoordinate().getBytes(StandardCharsets.UTF_8)).toString();
|
return UUID.nameUUIDFromBytes(coordinate.getCoordinate().getBytes(StandardCharsets.UTF_8)).toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.nifi.nar;
|
||||||
|
|
||||||
import org.apache.nifi.bundle.Bundle;
|
import org.apache.nifi.bundle.Bundle;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Collection;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loads WARs from extensions and makes them available to the running application.
|
* Loads WARs from extensions and makes them available to the running application.
|
||||||
|
@ -28,10 +28,10 @@ public interface ExtensionUiLoader {
|
||||||
/**
|
/**
|
||||||
* @param bundles the set of bundles to load WARs from
|
* @param bundles the set of bundles to load WARs from
|
||||||
*/
|
*/
|
||||||
void loadExtensionUis(Set<Bundle> bundles);
|
void loadExtensionUis(Collection<Bundle> bundles);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param bundles the set of bundles to unload WARs for
|
* @param bundles the set of bundles to unload WARs for
|
||||||
*/
|
*/
|
||||||
void unloadExtensionUis(Set<Bundle> bundles);
|
void unloadExtensionUis(Collection<Bundle> bundles);
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ public interface NarLoader {
|
||||||
*
|
*
|
||||||
* @param bundles the NARs to unload
|
* @param bundles the NARs to unload
|
||||||
*/
|
*/
|
||||||
void unload(Set<Bundle> bundles);
|
void unload(Collection<Bundle> bundles);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unloads the given NAR.
|
* Unloads the given NAR.
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.Set;
|
||||||
import java.util.jar.Attributes;
|
import java.util.jar.Attributes;
|
||||||
import java.util.jar.JarFile;
|
import java.util.jar.JarFile;
|
||||||
import java.util.jar.Manifest;
|
import java.util.jar.Manifest;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loads a set of NARs from the file system into the running application.
|
* Loads a set of NARs from the file system into the running application.
|
||||||
|
@ -142,15 +141,15 @@ public class StandardNarLoader implements NarLoader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void unload(final Set<Bundle> bundles) {
|
public synchronized void unload(final Collection<Bundle> bundles) {
|
||||||
if (extensionUiLoader != null) {
|
if (extensionUiLoader != null) {
|
||||||
extensionUiLoader.unloadExtensionUis(bundles);
|
extensionUiLoader.unloadExtensionUis(bundles);
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<BundleCoordinate> bundleCoordinates = bundles.stream()
|
final List<BundleCoordinate> bundleCoordinates = bundles.stream()
|
||||||
.map(Bundle::getBundleDetails)
|
.map(Bundle::getBundleDetails)
|
||||||
.map(BundleDetails::getCoordinate)
|
.map(BundleDetails::getCoordinate)
|
||||||
.collect(Collectors.toSet());
|
.toList();
|
||||||
|
|
||||||
for (final BundleCoordinate bundleCoordinate : bundleCoordinates) {
|
for (final BundleCoordinate bundleCoordinate : bundleCoordinates) {
|
||||||
LOGGER.info("Unloading bundle [{}]", bundleCoordinate);
|
LOGGER.info("Unloading bundle [{}]", bundleCoordinate);
|
||||||
|
|
|
@ -32,8 +32,8 @@ import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.nio.file.SimpleFileVisitor;
|
import java.nio.file.SimpleFileVisitor;
|
||||||
import java.nio.file.attribute.BasicFileAttributes;
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
@ -92,12 +92,12 @@ public abstract class AbstractTestNarLoader {
|
||||||
extensionMapping,
|
extensionMapping,
|
||||||
new ExtensionUiLoader() {
|
new ExtensionUiLoader() {
|
||||||
@Override
|
@Override
|
||||||
public void loadExtensionUis(final Set<Bundle> bundles) {
|
public void loadExtensionUis(final Collection<Bundle> bundles) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unloadExtensionUis(final Set<Bundle> bundles) {
|
public void unloadExtensionUis(final Collection<Bundle> bundles) {
|
||||||
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.nifi.components.ConfigurableComponent;
|
||||||
import org.apache.nifi.python.PythonProcessorDetails;
|
import org.apache.nifi.python.PythonProcessorDetails;
|
||||||
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -116,7 +117,7 @@ public interface ExtensionManager {
|
||||||
* @param bundleCoordinates the coordinates
|
* @param bundleCoordinates the coordinates
|
||||||
* @return the removed bundles, or empty if none exists
|
* @return the removed bundles, or empty if none exists
|
||||||
*/
|
*/
|
||||||
Set<Bundle> removeBundles(Set<BundleCoordinate> bundleCoordinates);
|
Set<Bundle> removeBundles(Collection<BundleCoordinate> bundleCoordinates);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the bundles that have a dependency on the bundle with the given coordinate.
|
* Retrieves the bundles that have a dependency on the bundle with the given coordinate.
|
||||||
|
|
|
@ -722,8 +722,8 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Set<Bundle> removeBundles(final Set<BundleCoordinate> bundleCoordinates) {
|
public synchronized Set<Bundle> removeBundles(final Collection<BundleCoordinate> bundleCoordinates) {
|
||||||
final Set<Bundle> removedBundles = new HashSet<>();
|
final Set<Bundle> removedBundles = new LinkedHashSet<>();
|
||||||
for (final BundleCoordinate bundleCoordinate : bundleCoordinates) {
|
for (final BundleCoordinate bundleCoordinate : bundleCoordinates) {
|
||||||
final Bundle removedBundle = removeBundle(bundleCoordinate);
|
final Bundle removedBundle = removeBundle(bundleCoordinate);
|
||||||
if (removedBundle != null) {
|
if (removedBundle != null) {
|
||||||
|
|
|
@ -337,7 +337,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void loadExtensionUis(final Set<Bundle> bundles) {
|
public synchronized void loadExtensionUis(final Collection<Bundle> bundles) {
|
||||||
extensionUisToLoad.addAll(bundles);
|
extensionUisToLoad.addAll(bundles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,7 +393,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void unloadExtensionUis(final Set<Bundle> bundles) {
|
public synchronized void unloadExtensionUis(final Collection<Bundle> bundles) {
|
||||||
bundles.forEach(this::unloadExtensionUis);
|
bundles.forEach(this::unloadExtensionUis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,8 +130,9 @@ public class NarUploadStandaloneIT extends NiFiSystemIT {
|
||||||
assertNotNull(customProcessor.getComponent().getExtensionMissing());
|
assertNotNull(customProcessor.getComponent().getExtensionMissing());
|
||||||
assertFalse(customProcessor.getComponent().getExtensionMissing());
|
assertFalse(customProcessor.getComponent().getExtensionMissing());
|
||||||
|
|
||||||
// Verify service API NAR can't be replaced while other NARs depend on it
|
// Verify service API NAR can be replaced while other NARs depend on it
|
||||||
assertThrows(NiFiClientException.class, () -> narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_API_NAR_ID));
|
final NarSummaryDTO replacedServiceApiNar = narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_API_NAR_ID);
|
||||||
|
waitFor(narUploadUtil.getWaitForNarStateSupplier(replacedServiceApiNar.getIdentifier(), NarState.INSTALLED));
|
||||||
|
|
||||||
// Verify processors NAR can be replaced
|
// Verify processors NAR can be replaced
|
||||||
final NarSummaryDTO replacedProcessorsNar = narUploadUtil.uploadNar(narsLocation, PROCESSORS_NAR_ID);
|
final NarSummaryDTO replacedProcessorsNar = narUploadUtil.uploadNar(narsLocation, PROCESSORS_NAR_ID);
|
||||||
|
|
Loading…
Reference in New Issue