NIFI-9421: Running NiFi Stateless with local NARs only

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5557.
This commit is contained in:
Peter Turcsanyi 2021-11-30 14:15:38 +01:00 committed by Joe Gresock
parent bf288f3ba1
commit dafa03a21a
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
4 changed files with 36 additions and 22 deletions

View File

@ -156,6 +156,7 @@ a result, Connect can be configured with the URL of a Nexus server. The example
extensions. When a connector is started, it will first identify which extensions are necessary to run the dataflow, determine which extensions are available,
and then automatically download any necessary extensions that it currently does not have available. If configuring a Nexus instance that has multiple repositories,
the name of the repository should be included in the URL. For example: `https://nexus-private.myorganization.org/nexus/repository/my-repository/`.
If the property is not specified, the necessary extensions (used by the flow) must be provided in the `extensions.directory` before deploying the connector.
`(9) flow.snapshot`: Specifies the dataflow to run. This is the file that was downloaded by right-clicking on the Process Group in NiFi and
clicking "Download flow". The dataflow can be stored external to the configured and the location can be represented as an HTTP (or HTTPS URL), or a filename.
@ -292,6 +293,7 @@ a result, Connect can be configured with the URL of a Nexus server. The example
extensions. When a connector is started, it will first identify which extensions are necessary to run the dataflow, determine which extensions are available,
and then automatically download any necessary extensions that it currently does not have available. If configuring a Nexus instance that has multiple repositories,
the name of the repository should be included in the URL. For example: `https://nexus-private.myorganization.org/nexus/repository/my-repository/`.
If the property is not specified, the necessary extensions (used by the flow) must be provided in the `extensions.directory` before deploying the connector.
`(9) flow.snapshot`: Specifies the dataflow to run. This is the file that was downloaded by right-clicking on the Process Group in NiFi and
clicking "Download flow". The dataflow can be stored external to the configured and the location can be represented as an HTTP (or HTTPS URL), or a filename.

View File

@ -221,8 +221,11 @@ The following properties may be used to indicate where extensions are to be loca
When Stateless NiFi is started, it parses the provided dataflow and determines which bundles/extensions are necessary
to run the dataflow. If an extension is not available, or the version referenced by the flow is not available, Stateless
may attempt to download the extensions automatically. To do this, one or more Extension Clients must be configured.
Each client is configured using several properties, which are all tied together using a 'key'. For example, if we have
may attempt to download the extensions automatically. To do this, one or more Extension Clients need to be configured. If no
Extension Clients are configured, only those extensions can be used that are already available (e.g. manually downloaded and copied offline)
in the directories specified by the `nifi.stateless.extensions.directory` and `nifi.stateless.readonly.extensions.directory.<suffix>` properties described above.
Each Extension Client is configured using several properties, which are all tied together using a 'key'. For example, if we have
4 properties, `nifi.stateless.extension.client.ABC.type`, `nifi.stateless.extension.client.ABC.baseUrl`,
`nifi.stateless.extension.client.XYZ.type`, and `nifi.stateless.extension.client.XYZ.baseUrl`, then we know that
the first `type` property refers to the same client as the first `baseUrl` property because they both have the 'key'

View File

@ -25,6 +25,7 @@ import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarLoadResult;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,40 +47,49 @@ public class FileSystemExtensionRepository implements ExtensionRepository {
private final ExtensionDiscoveringManager extensionManager;
private final NarClassLoaders narClassLoaders;
private final File writableLibDirectory;
private final File narDirectory;
private final File writableExtensionDirectory;
private final Set<File> readOnlyExtensionDirectories;
private final File workingDirectory;
private final List<ExtensionClient> clients;
public FileSystemExtensionRepository(final ExtensionDiscoveringManager extensionManager, final File writableLibDirectory, final Collection<File> readOnlyExtensionDirectories,
final File workingDirectory, final NarClassLoaders narClassLoaders, final List<ExtensionClient> clients) {
public FileSystemExtensionRepository(final ExtensionDiscoveringManager extensionManager, final StatelessEngineConfiguration engineConfiguration, final NarClassLoaders narClassLoaders,
final List<ExtensionClient> clients) {
this.extensionManager = extensionManager;
this.writableLibDirectory = writableLibDirectory;
this.readOnlyExtensionDirectories = readOnlyExtensionDirectories == null ? Collections.emptySet() : new HashSet<>(readOnlyExtensionDirectories);
this.workingDirectory = workingDirectory;
this.narDirectory = engineConfiguration.getNarDirectory();
this.writableExtensionDirectory = engineConfiguration.getExtensionsDirectory();
this.readOnlyExtensionDirectories = engineConfiguration.getReadOnlyExtensionsDirectories() == null
? Collections.emptySet()
: new HashSet<>(engineConfiguration.getReadOnlyExtensionsDirectories());
this.workingDirectory = engineConfiguration.getWorkingDirectory();
this.narClassLoaders = narClassLoaders;
this.clients = clients;
}
@Override
public void initialize() throws IOException {
if (readOnlyExtensionDirectories.isEmpty()) {
return;
final Set<File> narFiles = new HashSet<>();
// if nar.directory and extensions.directory are the same, StatelessBootstrap has already loaded the nars
if (writableExtensionDirectory != null && !writableExtensionDirectory.equals(narDirectory)) {
narFiles.addAll(listNarFiles(writableExtensionDirectory));
}
final Set<File> readOnlyNars = new HashSet<>();
for (final File extensionDir : readOnlyExtensionDirectories) {
final File[] narFiles = extensionDir.listFiles(file -> file.getName().endsWith(".nar"));
if (narFiles == null) {
logger.warn("Failed to perform listing of read-only extensions directory {}. Will not load extensions from this directory.", extensionDir.getAbsolutePath());
continue;
}
readOnlyNars.addAll(Arrays.asList(narFiles));
narFiles.addAll(listNarFiles(extensionDir));
}
loadExtensions(readOnlyNars);
loadExtensions(narFiles);
}
private Collection<File> listNarFiles(File extensionDir) {
final File[] narFiles = extensionDir.listFiles(file -> file.getName().endsWith(".nar"));
if (narFiles == null) {
logger.warn("Failed to perform listing of extensions directory {}. Will not preload extensions from this directory.", extensionDir.getAbsolutePath());
return Collections.emptyList();
}
return Arrays.asList(narFiles);
}
@Override
@ -111,7 +121,7 @@ public class FileSystemExtensionRepository implements ExtensionRepository {
return CompletableFuture.completedFuture(Collections.emptySet());
}
final DownloadQueue downloadQueue = new DownloadQueue(extensionManager, executorService, concurrentDownloads, bundleCoordinates, writableLibDirectory, clients);
final DownloadQueue downloadQueue = new DownloadQueue(extensionManager, executorService, concurrentDownloads, bundleCoordinates, writableExtensionDirectory, clients);
final CompletableFuture<Void> downloadFuture = downloadQueue.download();
logger.info("Beginning download of extensions {}", bundleCoordinates);

View File

@ -142,8 +142,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
extensionClients.add(extensionClient);
}
final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getExtensionsDirectory(),
engineConfiguration.getReadOnlyExtensionsDirectories(), engineConfiguration.getWorkingDirectory(), narClassLoaders, extensionClients);
final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration, narClassLoaders, extensionClients);
extensionRepository.initialize();
final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;