diff --git a/nifi-external/nifi-kafka-connect/README.md b/nifi-external/nifi-kafka-connect/README.md index d25e524f90..fbdef237fd 100644 --- a/nifi-external/nifi-kafka-connect/README.md +++ b/nifi-external/nifi-kafka-connect/README.md @@ -117,7 +117,8 @@ as it includes annotations (1), (2), etc. for illustrative purposes): (12) "header.attribute.regex": "syslog.*", (13) "krb5.file": "/etc/krb5.conf", (14) "dataflow.timeout": "30 sec", -(15) "parameter.Syslog Port": "19944" +(15) "parameter.Syslog Port": "19944", +(16) "extensions.directory": "/tmp/stateless-extensions" } } ``` @@ -191,6 +192,9 @@ Process Groups have their own Parameter Contexts, this value will be used for an should be applied only to a specific Parameter Context, the name of the Parameter Context may be supplied and separated from the Parameter Name with a colon. For example, `parameter.Syslog Context:Syslog Port`. In this case, the only Parameter Context whose `Syslog Port` parameter would be set would be the Parameter Context whose name is `Syslog Context`. +`(16) extensions.directory` : Specifies the directory to add any downloaded extensions to. If not specified, the extensions will be written to the same directory that the +connector lives in. Because this directory may not be writable, and to aid in upgrade scenarios, it is highly recommended that this property be configured. + ### Transactional sources @@ -252,7 +256,8 @@ as it includes annotations (1), (2), etc. for illustrative purposes): (12) "headers.as.attributes.regex": "syslog.*", (13) "krb5.file": "/etc/krb5.conf", (14) "dataflow.timeout": "30 sec", -(15) "parameter.Directory": "/syslog" +(15) "parameter.Directory": "/syslog", +(16) "extensions.directory": "/tmp/stateless-extensions" } } ``` @@ -323,6 +328,9 @@ Process Groups have their own Parameter Contexts, this value will be used for an should be applied only to a specific Parameter Context, the name of the Parameter Context may be supplied and separated from the Parameter Name with a colon. For example, `parameter.HDFS:Directory`. In this case, the only Parameter Context whose `Directory` parameter would be set would be the Parameter Context whose name is `HDFS`. +`(16) extensions.directory` : Specifies the directory to add any downloaded extensions to. If not specified, the extensions will be written to the same directory that the +connector lives in. Because this directory may not be writable, and to aid in upgrade scenarios, it is highly recommended that this property be configured. + ### Merging @@ -441,8 +449,10 @@ The Connector will then examine its own set of downloaded extensions and determi and begin downloading them. In order to do this, the connect configuration must specify where to download the extensions. This is the reason for the "nexus.url" property that is described -in both the Source Connector and the Sink Connector. Once downloaded, the extensions are placed in the same directory as existing NiFi Archive (NAR) files. -Unless explicitly specified in the connector configuration (via the `nar.directory` configuration element), this is auto-detected to be the same directory +in both the Source Connector and the Sink Connector. Once downloaded, the extensions are placed in the configured extensions directory (configured via the +`extensions.directory` configuration element). +If the `extensions.directory` is not explicitly specified in the connector configuration, extensions will be added to the NAR Directory +(configured via the `nar.directory` configuration element). If this is not specified, it is is auto-detected to be the same directory that the NiFi Kafka Connector was installed in. @@ -467,7 +477,9 @@ Kafka Connect does not allow for state to be stored for Sink Tasks. NiFi provides several processors that are expected to run only on a single node in the cluster. This is accomplished by setting the Execution Node to "Primary Node Only" in the scheduling tab when configuring a NiFi Processor. When using the Source Connector, if any source processor in the configured dataflow is set to run on Primary Node Only, only a single task will ever run, even if the "tasks" configuration element is set to a large value. In this -case, a warning will be logged if attempting to use multiple tasks for a dataflow that has a source processor configured for Primary Node Only. +case, a warning will be logged if attempting to use multiple tasks for a dataflow that has a source processor configured for Primary Node Only. Because Processors +should only be scheduled on Primary Node Only if they are sources of data, this is ignored for all Sink Tasks and for any Processor in a Source Task that has +incoming connections. #### Processor Yielding diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java index 36e20010b3..ebc4c5fd78 100644 --- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java @@ -55,6 +55,7 @@ public class StatelessKafkaConnectorUtil { private static final Lock unpackNarLock = new ReentrantLock(); static final String NAR_DIRECTORY = "nar.directory"; + static final String EXTENSIONS_DIRECTORY = "extensions.directory"; static final String WORKING_DIRECTORY = "working.directory"; static final String FLOW_SNAPSHOT = "flow.snapshot"; static final String KRB5_FILE = "krb5.file"; @@ -79,6 +80,7 @@ public class StatelessKafkaConnectorUtil { static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf"; static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec"; static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working"); + static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions"); static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless"; private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar"); @@ -88,7 +90,9 @@ public class StatelessKafkaConnectorUtil { public static void addCommonConfigElements(final ConfigDef configDef) { configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH, "Specifies the directory that stores the NiFi Archives (NARs)"); - configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH, + configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, + "Specifies the directory that stores the extensions that will be downloaded (if any) from the configured Extension Client"); + configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Specifies the temporary working directory for expanding NiFi Archives (NARs)"); configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new FlowSnapshotValidator(), ConfigDef.Importance.HIGH, "Specifies the dataflow to run. This may be a file containing the dataflow, a URL that points to a dataflow, or a String containing the entire dataflow as an escaped JSON."); @@ -229,12 +233,23 @@ public class StatelessKafkaConnectorUtil { narDirectory = new File(narDirectoryFilename); } - final File workingDirectory; + final String dataflowName = properties.get(DATAFLOW_NAME); + + final File baseWorkingDirectory; final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY); if (workingDirectoryFilename == null) { - workingDirectory = DEFAULT_WORKING_DIRECTORY; + baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY; } else { - workingDirectory = new File(workingDirectoryFilename); + baseWorkingDirectory = new File(workingDirectoryFilename); + } + final File workingDirectory = new File(baseWorkingDirectory, dataflowName); + + final File extensionsDirectory; + final String extensionsDirectoryFilename = properties.get(EXTENSIONS_DIRECTORY); + if (extensionsDirectoryFilename == null) { + extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY; + } else { + extensionsDirectory = new File(extensionsDirectoryFilename); } final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties); @@ -250,6 +265,11 @@ public class StatelessKafkaConnectorUtil { return narDirectory; } + @Override + public File getExtensionsDirectory() { + return extensionsDirectory; + } + @Override public File getKrb5File() { return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE)); diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java index e99d526a19..ab153b4f28 100644 --- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java @@ -78,6 +78,7 @@ public class StatelessNiFiSinkTask extends SinkTask { headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, ""); dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); + dataflow.initialize(); // Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException. final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java index 8a4986f4d6..1b7cc12daf 100644 --- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java @@ -88,6 +88,7 @@ public class StatelessNiFiSourceTask extends SourceTask { headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); + dataflow.initialize(); // Determine the name of the Output Port to retrieve data from dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java index 8459c4fecd..6a7d86c783 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java @@ -39,6 +39,7 @@ public class PropertiesFileEngineConfigurationParser { private static final String PREFIX = "nifi.stateless."; private static final String NAR_DIRECTORY = PREFIX + "nar.directory"; + private static final String EXTENSIONS_DIRECTORY = PREFIX + "extensions.directory"; private static final String WORKING_DIRECTORY = PREFIX + "working.directory"; private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore"; @@ -78,6 +79,12 @@ public class PropertiesFileEngineConfigurationParser { throw new StatelessConfigurationException("Working Directory " + workingDirectory.getAbsolutePath() + " specified in properties file does not exist and could not be created"); } + final String extensionsDirectoryFilename = properties.getProperty(EXTENSIONS_DIRECTORY); + final File extensionsDirectory = extensionsDirectoryFilename == null ? narDirectory : new File(extensionsDirectoryFilename); + if (!extensionsDirectory.exists() && !extensionsDirectory.mkdirs()) { + throw new StatelessConfigurationException("Extensions Directory " + narDirectory.getAbsolutePath() + " specified in properties file does not exist and could not be created"); + } + final String krb5Filename = properties.getProperty(KRB5_FILE, DEFAULT_KRB5_FILENAME); final File krb5File = new File(krb5Filename); @@ -97,6 +104,11 @@ public class PropertiesFileEngineConfigurationParser { return narDirectory; } + @Override + public File getExtensionsDirectory() { + return extensionsDirectory; + } + @Override public File getKrb5File() { return krb5File; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java new file mode 100644 index 0000000000..20c706c403 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.stateless.engine; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * If multiple Stateless dataflows are loaded concurrently within the same JVM, we need to ensure that the dataflows + * do not stomp on one another when unpacking NAR's. To do that, we need a mechanism by which a single lock can be shared + * across multiple classes, as the Extension Repository as well as the bootstrap logic may attempt to unpack NARs. + * Because these classes exist across multiple modules, and because statically defined locks at that level may not be enough + * (due to multiple classloders being used for the 'stateless nar'), we define a singleton Lock within the nifi-stateless-api module. + * This lock should always be obtained before attempting to unpack nars. + */ +public class NarUnpackLock { + private static final Logger logger = LoggerFactory.getLogger(NarUnpackLock.class); + + private static final Lock lock = new ReentrantLock(); + + public static void lock() { + lock.lock(); + logger.debug("Lock obtained by thread {}: {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + } + + public static void unlock() { + lock.unlock(); + logger.debug("Lock obtained by thread {}: {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + } + + public static boolean tryLock() { + final boolean obtained = lock.tryLock(); + + if (obtained) { + logger.debug("Lock obtained by thread {}: {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + } + + return obtained; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java index a21e703d24..79e3d6a89b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java @@ -28,6 +28,8 @@ public interface StatelessEngineConfiguration { File getNarDirectory(); + File getExtensionsDirectory(); + File getKrb5File(); SslContextDefinition getSslContext(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java index 89402acc7b..be6e62c018 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java @@ -24,8 +24,33 @@ import java.util.Map; import java.util.Set; public interface StatelessDataflow { + /** + * Triggers the dataflow to run, returning a DataflowTrigger that can be used to wait for the result + * @return a DataflowTrigger that can be used to wait for the result + * + * @throws IllegalStateException if called before {@link #initialize()} is called. + */ DataflowTrigger trigger(); + /** + *

+ * Performs initialization necessary for triggering dataflows. These activities include, but are not limited to: + *

+ * + * + * + *

+ * This method MUST be called prior to calling {@link #trigger()}. + *

+ */ + void initialize(); + void shutdown(); StatelessDataflowValidation performValidation(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java index 95af05274a..3146cbf7d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java @@ -102,6 +102,8 @@ public class RunStatelessFlow { final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile); final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition, parameterOverrides); + dataflow.initialize(); + final StatelessDataflowValidation validation = dataflow.performValidation(); if (!validation.isValid()) { logger.error(validation.toString()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java index 415e1dbd9e..6a29fee28a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java @@ -23,6 +23,7 @@ import org.apache.nifi.nar.NarUnpacker; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.stateless.config.ParameterOverride; import org.apache.nifi.stateless.config.StatelessConfigurationException; +import org.apache.nifi.stateless.engine.NarUnpackLock; import org.apache.nifi.stateless.engine.StatelessEngineConfiguration; import org.apache.nifi.stateless.flow.DataflowDefinition; import org.apache.nifi.stateless.flow.DataflowDefinitionParser; @@ -98,7 +99,12 @@ public class StatelessBootstrap { // Unpack NARs final long unpackStart = System.currentTimeMillis(); final Predicate narFilter = coordinate -> true; - NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, null, narDirectories, false, false, false, narFilter); + NarUnpackLock.lock(); + try { + NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, null, narDirectories, false, false, false, narFilter); + } finally { + NarUnpackLock.unlock(); + } final long unpackMillis = System.currentTimeMillis() - unpackStart; logger.info("Unpacked NAR files in {} millis", unpackMillis); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java index c8153f014a..18fccf1a88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java @@ -64,7 +64,7 @@ public class DownloadQueue { private final List clients; private final BlockingQueue toDownload = new LinkedBlockingQueue<>(); - private final Set allDownloads = new HashSet<>(); + private final Set allDownloads = Collections.synchronizedSet(new HashSet<>()); public DownloadQueue(final ExtensionManager extensionManager, final ExecutorService executorService, final int concurrentDownloads, final Collection bundles, final File narLibDirectory, final List clients) { @@ -74,18 +74,23 @@ public class DownloadQueue { this.narLibDirectory = narLibDirectory; this.clients = clients; + if (!narLibDirectory.exists()) { + final boolean created = narLibDirectory.mkdirs() || narLibDirectory.exists(); + if (!created) { + logger.error("Extensions directory {} did not exist and could not be created.", narLibDirectory.getAbsolutePath()); + } + } + toDownload.addAll(bundles); allDownloads.addAll(bundles); } @SuppressWarnings("rawtypes") public CompletableFuture download() { - final Set downloaded = Collections.synchronizedSet(new HashSet<>()); - final CompletableFuture[] futures = new CompletableFuture[concurrentDownloads]; for (int i=0; i < concurrentDownloads; i++) { final CompletableFuture completableFuture = new CompletableFuture<>(); - executorService.submit(new DownloadTask(toDownload, completableFuture, downloaded)); + executorService.submit(new DownloadTask(toDownload, completableFuture, allDownloads)); futures[i] = completableFuture; } @@ -110,29 +115,6 @@ public class DownloadQueue { && !NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId()); } - private synchronized void queueParents(final BundleCoordinate parentCoordinates) { - if (parentCoordinates == null) { - return; - } - - final Bundle existingBundle = extensionManager.getBundle(parentCoordinates); - if (existingBundle == null) { - if (allDownloads.contains(parentCoordinates)) { - // Already queued for download. - return; - } - - // We don't have have the parent yet. Queue it for download. - logger.debug("Enqueuing parent bundle {} to be downloaded", parentCoordinates); - allDownloads.add(parentCoordinates); - toDownload.add(parentCoordinates); - return; - } - - // Check/queue anything needed for download, recursively. - queueParents(existingBundle.getBundleDetails().getDependencyCoordinate()); - } - private File getBundleFile(final BundleCoordinate coordinate) { final String filename = coordinate.getId() + "-" + coordinate.getVersion() + ".nar"; return new File(narLibDirectory, filename); @@ -142,12 +124,12 @@ public class DownloadQueue { private class DownloadTask implements Runnable { private final BlockingQueue downloadQueue; private final CompletableFuture completableFuture; - private final Set filesDownloaded; + private final Set downloads; - public DownloadTask(final BlockingQueue downloadQueue, final CompletableFuture completableFuture, final Set filesDownloaded) { + public DownloadTask(final BlockingQueue downloadQueue, final CompletableFuture completableFuture, final Set filesDownloaded) { this.downloadQueue = downloadQueue; this.completableFuture = completableFuture; - this.filesDownloaded = filesDownloaded; + this.downloads = filesDownloaded; } @Override @@ -155,19 +137,7 @@ public class DownloadQueue { BundleCoordinate coordinate; while ((coordinate = downloadQueue.poll()) != null) { try { - final File downloaded = download(coordinate); - if (downloaded != null) { - filesDownloaded.add(downloaded); - - final BundleCoordinate parentCoordinate = getParentCoordinate(downloaded); - queueParents(parentCoordinate); - } - - final Bundle existingBundle = extensionManager.getBundle(coordinate); - if (existingBundle != null) { - final BundleCoordinate parentCoordinate = existingBundle.getBundleDetails().getDependencyCoordinate(); - queueParents(parentCoordinate); - } + downloadBundleAndParents(coordinate); } catch (final Exception e) { logger.error("Failed to download {}", coordinate, e); completableFuture.completeExceptionally(e); @@ -177,6 +147,26 @@ public class DownloadQueue { completableFuture.complete(null); } + private void downloadBundleAndParents(final BundleCoordinate coordinate) throws IOException { + if (coordinate == null) { + return; + } + + downloads.add(coordinate); + + final File downloaded = download(coordinate); + if (downloaded != null) { + final BundleCoordinate parentCoordinate = getParentCoordinate(downloaded); + downloadBundleAndParents(parentCoordinate); + } + + final Bundle existingBundle = extensionManager.getBundle(coordinate); + if (existingBundle != null) { + final BundleCoordinate parentCoordinate = existingBundle.getBundleDetails().getDependencyCoordinate(); + downloadBundleAndParents(parentCoordinate); + } + } + private BundleCoordinate getParentCoordinate(final File narFile) throws IOException { try (final JarFile nar = new JarFile(narFile)) { final Manifest manifest = nar.getManifest(); @@ -198,18 +188,14 @@ public class DownloadQueue { final List suppressed = new ArrayList<>(); final File destinationFile = getBundleFile(coordinate); - if (NarClassLoaders.JETTY_NAR_ID.equals(coordinate.getId())) { - logger.debug("Requested to download {} but only a single Jetty NAR is allowed to exist so will not download.", coordinate); - return null; - } - if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId())) { - logger.debug("Requested to download {} but only a single NiFi Framework NAR is allowed to exist so will not download.", coordinate); + if (!isDownloadable(coordinate)) { + logger.debug("Requested to download {} but only a single NAR of this type is allowed to exist so will not download.", coordinate); return null; } if (destinationFile.exists()) { logger.debug("Requested to download {} but destination file {} already exists. Will not download.", coordinate, destinationFile); - return null; + return destinationFile; } for (final ExtensionClient extensionClient : clients) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java index 1f9ddbf56e..173d7d9056 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java @@ -24,6 +24,7 @@ import org.apache.nifi.nar.ExtensionDiscoveringManager; import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.nar.NarLoadResult; import org.apache.nifi.nar.NarUnpacker; +import org.apache.nifi.stateless.engine.NarUnpackLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,13 +37,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public class FileSystemExtensionRepository implements ExtensionRepository { private static final Logger logger = LoggerFactory.getLogger(FileSystemExtensionRepository.class); - private static final Lock unpackLock = new ReentrantLock(); private final ExtensionDiscoveringManager extensionManager; private final NarClassLoaders narClassLoaders; @@ -91,6 +89,7 @@ public class FileSystemExtensionRepository implements ExtensionRepository { final DownloadQueue downloadQueue = new DownloadQueue(extensionManager, executorService, concurrentDownloads, bundleCoordinates, narLibDirectory, clients); final CompletableFuture downloadFuture = downloadQueue.download(); + logger.info("Beginning download of extensions {}", bundleCoordinates); final CompletableFuture> loadFuture = downloadFuture.thenApply(new Function>() { @Override @@ -116,12 +115,14 @@ public class FileSystemExtensionRepository implements ExtensionRepository { for (final File downloadedFile : downloadedFiles) { // Use a statically defined Lock to prevent multiple threads from unpacking their downloaded nars at the same time, // even if they use a different ExtensionRepository. - unpackLock.lock(); + NarUnpackLock.lock(); try { - final File unpackedDir = NarUnpacker.unpackNar(downloadedFile, workingDirectory, false); + logger.info("Unpacking {}", downloadedFile); + final File extensionsWorkingDirectory = new File(workingDirectory, "extensions"); + final File unpackedDir = NarUnpacker.unpackNar(downloadedFile, extensionsWorkingDirectory, false); unpackedDirs.add(unpackedDir); } finally { - unpackLock.unlock(); + NarUnpackLock.unlock(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java index 19fc7b9d42..74673eaef3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java @@ -169,8 +169,7 @@ public class StandardStatelessEngine implements StatelessEngine reportingTaskNodes = createReportingTasks(dataflowDefinition); final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory, - repositoryContextFactory, dataflowDefinition, stateManagerProvider); - dataflow.initialize(processScheduler); + repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler); return dataflow; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java index 5832896ca8..d462429030 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java @@ -144,7 +144,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor extensionClients.add(extensionClient); } - final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(), + final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getExtensionsDirectory(), engineConfiguration.getWorkingDirectory(), narClassLoaders, extensionClients); final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java index 672a79bc0a..04ea9e6fd1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java @@ -94,14 +94,14 @@ public class StandardStatelessFlow implements StatelessDataflow { private final DataflowDefinition dataflowDefinition; private final StatelessStateManagerProvider stateManagerProvider; private final ObjectMapper objectMapper = new ObjectMapper(); + private final ProcessScheduler processScheduler; private volatile ExecutorService runDataflowExecutor; - private volatile ProcessScheduler processScheduler; private volatile boolean initialized = false; public StandardStatelessFlow(final ProcessGroup rootGroup, final List reportingTasks, final ControllerServiceProvider controllerServiceProvider, final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition, - final StatelessStateManagerProvider stateManagerProvider) { + final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler) { this.rootGroup = rootGroup; this.allConnections = rootGroup.findAllConnections(); this.reportingTasks = reportingTasks; @@ -110,6 +110,7 @@ public class StandardStatelessFlow implements StatelessDataflow { this.repositoryContextFactory = repositoryContextFactory; this.dataflowDefinition = dataflowDefinition; this.stateManagerProvider = stateManagerProvider; + this.processScheduler = processScheduler; rootConnectables = new HashSet<>(); @@ -164,19 +165,24 @@ public class StandardStatelessFlow implements StatelessDataflow { } } - public void initialize(final ProcessScheduler processScheduler) { + @Override + public void initialize() { if (initialized) { - throw new IllegalStateException("Cannot initialize dataflow more than once"); + logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", this); + return; } initialized = true; - this.processScheduler = processScheduler; // Trigger validation to occur so that components can be enabled/started. final long validationStart = System.currentTimeMillis(); - performValidation(); + final StatelessDataflowValidation validationResult = performValidation(); final long validationMillis = System.currentTimeMillis() - validationStart; + if (!validationResult.isValid()) { + logger.warn("{} Attempting to initialize dataflow but found at least one invalid component: {}", this, validationResult); + } + // Enable Controller Services and start processors in the flow. // This is different than the calling ProcessGroup.startProcessing() because // that method triggers the behavior to happen in the background and provides no way of knowing @@ -256,7 +262,9 @@ public class StandardStatelessFlow implements StatelessDataflow { @Override public void shutdown() { - runDataflowExecutor.shutdown(); + if (runDataflowExecutor != null) { + runDataflowExecutor.shutdown(); + } rootGroup.stopProcessing(); rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown); @@ -274,10 +282,7 @@ public class StandardStatelessFlow implements StatelessDataflow { // invoke any methods annotated with @OnShutdown on Reporting Tasks reportingTasks.forEach(processScheduler::shutdownReportingTask); - if (processScheduler != null) { - processScheduler.shutdown(); - } - + processScheduler.shutdown(); repositoryContextFactory.shutdown(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java index 19c629e85b..772c87430f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java @@ -74,6 +74,11 @@ public class TestPropertiesFileFlowDefinitionParser { return null; } + @Override + public File getExtensionsDirectory() { + return null; + } + @Override public File getKrb5File() { return null; diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java index 4fe6c7688e..268061b80f 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java @@ -81,6 +81,11 @@ public class StatelessSystemIT { return new File("target/nifi-stateless-assembly/nars"); } + @Override + public File getExtensionsDirectory() { + return new File("target/nifi-stateless-assembly/extensions"); + } + @Override public File getKrb5File() { return new File("/etc/krb5.conf");