NIFI-8380: Allow for an extensions.directory property to specify where to place downloaded files. Also fixed an issue that was encountered, when a Source Processor is scheduled for Primary Node Only but more than 1 task is set. In that case, even though only a single task will should be scheduled, an Exception was getting thrown because @OnScheduled methods of Processors were still called. To avoid this, moved the initialization of the dataflow outside of the creation of the dataflow so that initialization can be triggered only when appropriate.

NIFI-8380: Removed requirement in validation for working directory and extensions directory to exist; removed auto-creation of directories in validation

NIFI-8380: Fixed a few thrading bugs, so that if we have multiple threads trying to download/unpack extensions, we properly synchronize the unpacking and unpack into the correct sub-directory under the working directory

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4950.
This commit is contained in:
Mark Payne 2021-03-29 12:23:08 -04:00 committed by Pierre Villard
parent f4ad658fae
commit b79987918a
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
17 changed files with 220 additions and 80 deletions

View File

@ -117,7 +117,8 @@ as it includes annotations (1), (2), etc. for illustrative purposes):
(12) "header.attribute.regex": "syslog.*",
(13) "krb5.file": "/etc/krb5.conf",
(14) "dataflow.timeout": "30 sec",
(15) "parameter.Syslog Port": "19944"
(15) "parameter.Syslog Port": "19944",
(16) "extensions.directory": "/tmp/stateless-extensions"
}
}
```
@ -191,6 +192,9 @@ Process Groups have their own Parameter Contexts, this value will be used for an
should be applied only to a specific Parameter Context, the name of the Parameter Context may be supplied and separated from the Parameter Name with a colon. For example,
`parameter.Syslog Context:Syslog Port`. In this case, the only Parameter Context whose `Syslog Port` parameter would be set would be the Parameter Context whose name is `Syslog Context`.
`(16) extensions.directory` : Specifies the directory to add any downloaded extensions to. If not specified, the extensions will be written to the same directory that the
connector lives in. Because this directory may not be writable, and to aid in upgrade scenarios, it is highly recommended that this property be configured.
### Transactional sources
@ -252,7 +256,8 @@ as it includes annotations (1), (2), etc. for illustrative purposes):
(12) "headers.as.attributes.regex": "syslog.*",
(13) "krb5.file": "/etc/krb5.conf",
(14) "dataflow.timeout": "30 sec",
(15) "parameter.Directory": "/syslog"
(15) "parameter.Directory": "/syslog",
(16) "extensions.directory": "/tmp/stateless-extensions"
}
}
```
@ -323,6 +328,9 @@ Process Groups have their own Parameter Contexts, this value will be used for an
should be applied only to a specific Parameter Context, the name of the Parameter Context may be supplied and separated from the Parameter Name with a colon. For example,
`parameter.HDFS:Directory`. In this case, the only Parameter Context whose `Directory` parameter would be set would be the Parameter Context whose name is `HDFS`.
`(16) extensions.directory` : Specifies the directory to add any downloaded extensions to. If not specified, the extensions will be written to the same directory that the
connector lives in. Because this directory may not be writable, and to aid in upgrade scenarios, it is highly recommended that this property be configured.
<a name="merging"></a>
### Merging
@ -441,8 +449,10 @@ The Connector will then examine its own set of downloaded extensions and determi
and begin downloading them.
In order to do this, the connect configuration must specify where to download the extensions. This is the reason for the "nexus.url" property that is described
in both the Source Connector and the Sink Connector. Once downloaded, the extensions are placed in the same directory as existing NiFi Archive (NAR) files.
Unless explicitly specified in the connector configuration (via the `nar.directory` configuration element), this is auto-detected to be the same directory
in both the Source Connector and the Sink Connector. Once downloaded, the extensions are placed in the configured extensions directory (configured via the
`extensions.directory` configuration element).
If the `extensions.directory` is not explicitly specified in the connector configuration, extensions will be added to the NAR Directory
(configured via the `nar.directory` configuration element). If this is not specified, it is is auto-detected to be the same directory
that the NiFi Kafka Connector was installed in.
@ -467,7 +477,9 @@ Kafka Connect does not allow for state to be stored for Sink Tasks.
NiFi provides several processors that are expected to run only on a single node in the cluster. This is accomplished by setting the Execution Node to
"Primary Node Only" in the scheduling tab when configuring a NiFi Processor. When using the Source Connector, if any source processor in the configured
dataflow is set to run on Primary Node Only, only a single task will ever run, even if the "tasks" configuration element is set to a large value. In this
case, a warning will be logged if attempting to use multiple tasks for a dataflow that has a source processor configured for Primary Node Only.
case, a warning will be logged if attempting to use multiple tasks for a dataflow that has a source processor configured for Primary Node Only. Because Processors
should only be scheduled on Primary Node Only if they are sources of data, this is ignored for all Sink Tasks and for any Processor in a Source Task that has
incoming connections.
#### Processor Yielding

View File

@ -55,6 +55,7 @@ public class StatelessKafkaConnectorUtil {
private static final Lock unpackNarLock = new ReentrantLock();
static final String NAR_DIRECTORY = "nar.directory";
static final String EXTENSIONS_DIRECTORY = "extensions.directory";
static final String WORKING_DIRECTORY = "working.directory";
static final String FLOW_SNAPSHOT = "flow.snapshot";
static final String KRB5_FILE = "krb5.file";
@ -79,6 +80,7 @@ public class StatelessKafkaConnectorUtil {
static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
@ -88,7 +90,9 @@ public class StatelessKafkaConnectorUtil {
public static void addCommonConfigElements(final ConfigDef configDef) {
configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
"Specifies the directory that stores the NiFi Archives (NARs)");
configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
"Specifies the directory that stores the extensions that will be downloaded (if any) from the configured Extension Client");
configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
"Specifies the temporary working directory for expanding NiFi Archives (NARs)");
configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
"Specifies the dataflow to run. This may be a file containing the dataflow, a URL that points to a dataflow, or a String containing the entire dataflow as an escaped JSON.");
@ -229,12 +233,23 @@ public class StatelessKafkaConnectorUtil {
narDirectory = new File(narDirectoryFilename);
}
final File workingDirectory;
final String dataflowName = properties.get(DATAFLOW_NAME);
final File baseWorkingDirectory;
final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY);
if (workingDirectoryFilename == null) {
workingDirectory = DEFAULT_WORKING_DIRECTORY;
baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY;
} else {
workingDirectory = new File(workingDirectoryFilename);
baseWorkingDirectory = new File(workingDirectoryFilename);
}
final File workingDirectory = new File(baseWorkingDirectory, dataflowName);
final File extensionsDirectory;
final String extensionsDirectoryFilename = properties.get(EXTENSIONS_DIRECTORY);
if (extensionsDirectoryFilename == null) {
extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY;
} else {
extensionsDirectory = new File(extensionsDirectoryFilename);
}
final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties);
@ -250,6 +265,11 @@ public class StatelessKafkaConnectorUtil {
return narDirectory;
}
@Override
public File getExtensionsDirectory() {
return extensionsDirectory;
}
@Override
public File getKrb5File() {
return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE));

View File

@ -78,6 +78,7 @@ public class StatelessNiFiSinkTask extends SinkTask {
headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, "");
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
dataflow.initialize();
// Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException.
final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);

View File

@ -88,6 +88,7 @@ public class StatelessNiFiSourceTask extends SourceTask {
headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex);
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
dataflow.initialize();
// Determine the name of the Output Port to retrieve data from
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);

View File

@ -39,6 +39,7 @@ public class PropertiesFileEngineConfigurationParser {
private static final String PREFIX = "nifi.stateless.";
private static final String NAR_DIRECTORY = PREFIX + "nar.directory";
private static final String EXTENSIONS_DIRECTORY = PREFIX + "extensions.directory";
private static final String WORKING_DIRECTORY = PREFIX + "working.directory";
private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore";
@ -78,6 +79,12 @@ public class PropertiesFileEngineConfigurationParser {
throw new StatelessConfigurationException("Working Directory " + workingDirectory.getAbsolutePath() + " specified in properties file does not exist and could not be created");
}
final String extensionsDirectoryFilename = properties.getProperty(EXTENSIONS_DIRECTORY);
final File extensionsDirectory = extensionsDirectoryFilename == null ? narDirectory : new File(extensionsDirectoryFilename);
if (!extensionsDirectory.exists() && !extensionsDirectory.mkdirs()) {
throw new StatelessConfigurationException("Extensions Directory " + narDirectory.getAbsolutePath() + " specified in properties file does not exist and could not be created");
}
final String krb5Filename = properties.getProperty(KRB5_FILE, DEFAULT_KRB5_FILENAME);
final File krb5File = new File(krb5Filename);
@ -97,6 +104,11 @@ public class PropertiesFileEngineConfigurationParser {
return narDirectory;
}
@Override
public File getExtensionsDirectory() {
return extensionsDirectory;
}
@Override
public File getKrb5File() {
return krb5File;

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* If multiple Stateless dataflows are loaded concurrently within the same JVM, we need to ensure that the dataflows
* do not stomp on one another when unpacking NAR's. To do that, we need a mechanism by which a single lock can be shared
* across multiple classes, as the Extension Repository as well as the bootstrap logic may attempt to unpack NARs.
* Because these classes exist across multiple modules, and because statically defined locks at that level may not be enough
* (due to multiple classloders being used for the 'stateless nar'), we define a singleton Lock within the nifi-stateless-api module.
* This lock should always be obtained before attempting to unpack nars.
*/
public class NarUnpackLock {
private static final Logger logger = LoggerFactory.getLogger(NarUnpackLock.class);
private static final Lock lock = new ReentrantLock();
public static void lock() {
lock.lock();
logger.debug("Lock obtained by thread {}: {}", Thread.currentThread().getId(), Thread.currentThread().getName());
}
public static void unlock() {
lock.unlock();
logger.debug("Lock obtained by thread {}: {}", Thread.currentThread().getId(), Thread.currentThread().getName());
}
public static boolean tryLock() {
final boolean obtained = lock.tryLock();
if (obtained) {
logger.debug("Lock obtained by thread {}: {}", Thread.currentThread().getId(), Thread.currentThread().getName());
}
return obtained;
}
}

View File

@ -28,6 +28,8 @@ public interface StatelessEngineConfiguration {
File getNarDirectory();
File getExtensionsDirectory();
File getKrb5File();
SslContextDefinition getSslContext();

View File

@ -24,8 +24,33 @@ import java.util.Map;
import java.util.Set;
public interface StatelessDataflow {
/**
* Triggers the dataflow to run, returning a DataflowTrigger that can be used to wait for the result
* @return a DataflowTrigger that can be used to wait for the result
*
* @throws IllegalStateException if called before {@link #initialize()} is called.
*/
DataflowTrigger trigger();
/**
* <p>
* Performs initialization necessary for triggering dataflows. These activities include, but are not limited to:
* </p>
*
* <ul>
* <li>Component validation</li>
* <li>Enabling Controller Services</li>
* <li>Initializing processors (i.e., invoking @OnScheduled methods, etc.), but not triggering any Processors</li>
* <li>Initializing Remote Process Groups so that they can be triggered</li>
* <li>Scheduling Reporting Tasks to run</li>
* </ul>
*
* <p>
* This method MUST be called prior to calling {@link #trigger()}.
* </p>
*/
void initialize();
void shutdown();
StatelessDataflowValidation performValidation();

View File

@ -102,6 +102,8 @@ public class RunStatelessFlow {
final DataflowDefinition<?> dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile);
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition, parameterOverrides);
dataflow.initialize();
final StatelessDataflowValidation validation = dataflow.performValidation();
if (!validation.isValid()) {
logger.error(validation.toString());

View File

@ -23,6 +23,7 @@ import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
@ -98,7 +99,12 @@ public class StatelessBootstrap {
// Unpack NARs
final long unpackStart = System.currentTimeMillis();
final Predicate<BundleCoordinate> narFilter = coordinate -> true;
NarUnpackLock.lock();
try {
NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, null, narDirectories, false, false, false, narFilter);
} finally {
NarUnpackLock.unlock();
}
final long unpackMillis = System.currentTimeMillis() - unpackStart;
logger.info("Unpacked NAR files in {} millis", unpackMillis);

View File

@ -64,7 +64,7 @@ public class DownloadQueue {
private final List<ExtensionClient> clients;
private final BlockingQueue<BundleCoordinate> toDownload = new LinkedBlockingQueue<>();
private final Set<BundleCoordinate> allDownloads = new HashSet<>();
private final Set<BundleCoordinate> allDownloads = Collections.synchronizedSet(new HashSet<>());
public DownloadQueue(final ExtensionManager extensionManager, final ExecutorService executorService, final int concurrentDownloads, final Collection<BundleCoordinate> bundles,
final File narLibDirectory, final List<ExtensionClient> clients) {
@ -74,18 +74,23 @@ public class DownloadQueue {
this.narLibDirectory = narLibDirectory;
this.clients = clients;
if (!narLibDirectory.exists()) {
final boolean created = narLibDirectory.mkdirs() || narLibDirectory.exists();
if (!created) {
logger.error("Extensions directory {} did not exist and could not be created.", narLibDirectory.getAbsolutePath());
}
}
toDownload.addAll(bundles);
allDownloads.addAll(bundles);
}
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> download() {
final Set<File> downloaded = Collections.synchronizedSet(new HashSet<>());
final CompletableFuture[] futures = new CompletableFuture[concurrentDownloads];
for (int i=0; i < concurrentDownloads; i++) {
final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
executorService.submit(new DownloadTask(toDownload, completableFuture, downloaded));
executorService.submit(new DownloadTask(toDownload, completableFuture, allDownloads));
futures[i] = completableFuture;
}
@ -110,29 +115,6 @@ public class DownloadQueue {
&& !NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId());
}
private synchronized void queueParents(final BundleCoordinate parentCoordinates) {
if (parentCoordinates == null) {
return;
}
final Bundle existingBundle = extensionManager.getBundle(parentCoordinates);
if (existingBundle == null) {
if (allDownloads.contains(parentCoordinates)) {
// Already queued for download.
return;
}
// We don't have have the parent yet. Queue it for download.
logger.debug("Enqueuing parent bundle {} to be downloaded", parentCoordinates);
allDownloads.add(parentCoordinates);
toDownload.add(parentCoordinates);
return;
}
// Check/queue anything needed for download, recursively.
queueParents(existingBundle.getBundleDetails().getDependencyCoordinate());
}
private File getBundleFile(final BundleCoordinate coordinate) {
final String filename = coordinate.getId() + "-" + coordinate.getVersion() + ".nar";
return new File(narLibDirectory, filename);
@ -142,12 +124,12 @@ public class DownloadQueue {
private class DownloadTask implements Runnable {
private final BlockingQueue<BundleCoordinate> downloadQueue;
private final CompletableFuture<Void> completableFuture;
private final Set<File> filesDownloaded;
private final Set<BundleCoordinate> downloads;
public DownloadTask(final BlockingQueue<BundleCoordinate> downloadQueue, final CompletableFuture<Void> completableFuture, final Set<File> filesDownloaded) {
public DownloadTask(final BlockingQueue<BundleCoordinate> downloadQueue, final CompletableFuture<Void> completableFuture, final Set<BundleCoordinate> filesDownloaded) {
this.downloadQueue = downloadQueue;
this.completableFuture = completableFuture;
this.filesDownloaded = filesDownloaded;
this.downloads = filesDownloaded;
}
@Override
@ -155,19 +137,7 @@ public class DownloadQueue {
BundleCoordinate coordinate;
while ((coordinate = downloadQueue.poll()) != null) {
try {
final File downloaded = download(coordinate);
if (downloaded != null) {
filesDownloaded.add(downloaded);
final BundleCoordinate parentCoordinate = getParentCoordinate(downloaded);
queueParents(parentCoordinate);
}
final Bundle existingBundle = extensionManager.getBundle(coordinate);
if (existingBundle != null) {
final BundleCoordinate parentCoordinate = existingBundle.getBundleDetails().getDependencyCoordinate();
queueParents(parentCoordinate);
}
downloadBundleAndParents(coordinate);
} catch (final Exception e) {
logger.error("Failed to download {}", coordinate, e);
completableFuture.completeExceptionally(e);
@ -177,6 +147,26 @@ public class DownloadQueue {
completableFuture.complete(null);
}
private void downloadBundleAndParents(final BundleCoordinate coordinate) throws IOException {
if (coordinate == null) {
return;
}
downloads.add(coordinate);
final File downloaded = download(coordinate);
if (downloaded != null) {
final BundleCoordinate parentCoordinate = getParentCoordinate(downloaded);
downloadBundleAndParents(parentCoordinate);
}
final Bundle existingBundle = extensionManager.getBundle(coordinate);
if (existingBundle != null) {
final BundleCoordinate parentCoordinate = existingBundle.getBundleDetails().getDependencyCoordinate();
downloadBundleAndParents(parentCoordinate);
}
}
private BundleCoordinate getParentCoordinate(final File narFile) throws IOException {
try (final JarFile nar = new JarFile(narFile)) {
final Manifest manifest = nar.getManifest();
@ -198,18 +188,14 @@ public class DownloadQueue {
final List<Exception> suppressed = new ArrayList<>();
final File destinationFile = getBundleFile(coordinate);
if (NarClassLoaders.JETTY_NAR_ID.equals(coordinate.getId())) {
logger.debug("Requested to download {} but only a single Jetty NAR is allowed to exist so will not download.", coordinate);
return null;
}
if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId())) {
logger.debug("Requested to download {} but only a single NiFi Framework NAR is allowed to exist so will not download.", coordinate);
if (!isDownloadable(coordinate)) {
logger.debug("Requested to download {} but only a single NAR of this type is allowed to exist so will not download.", coordinate);
return null;
}
if (destinationFile.exists()) {
logger.debug("Requested to download {} but destination file {} already exists. Will not download.", coordinate, destinationFile);
return null;
return destinationFile;
}
for (final ExtensionClient extensionClient : clients) {

View File

@ -24,6 +24,7 @@ import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarLoadResult;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,13 +37,10 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
public class FileSystemExtensionRepository implements ExtensionRepository {
private static final Logger logger = LoggerFactory.getLogger(FileSystemExtensionRepository.class);
private static final Lock unpackLock = new ReentrantLock();
private final ExtensionDiscoveringManager extensionManager;
private final NarClassLoaders narClassLoaders;
@ -91,6 +89,7 @@ public class FileSystemExtensionRepository implements ExtensionRepository {
final DownloadQueue downloadQueue = new DownloadQueue(extensionManager, executorService, concurrentDownloads, bundleCoordinates, narLibDirectory, clients);
final CompletableFuture<Void> downloadFuture = downloadQueue.download();
logger.info("Beginning download of extensions {}", bundleCoordinates);
final CompletableFuture<Set<Bundle>> loadFuture = downloadFuture.thenApply(new Function<Void, Set<Bundle>>() {
@Override
@ -116,12 +115,14 @@ public class FileSystemExtensionRepository implements ExtensionRepository {
for (final File downloadedFile : downloadedFiles) {
// Use a statically defined Lock to prevent multiple threads from unpacking their downloaded nars at the same time,
// even if they use a different ExtensionRepository.
unpackLock.lock();
NarUnpackLock.lock();
try {
final File unpackedDir = NarUnpacker.unpackNar(downloadedFile, workingDirectory, false);
logger.info("Unpacking {}", downloadedFile);
final File extensionsWorkingDirectory = new File(workingDirectory, "extensions");
final File unpackedDir = NarUnpacker.unpackNar(downloadedFile, extensionsWorkingDirectory, false);
unpackedDirs.add(unpackedDir);
} finally {
unpackLock.unlock();
NarUnpackLock.unlock();
}
}

View File

@ -169,8 +169,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider);
dataflow.initialize(processScheduler);
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler);
return dataflow;
}

View File

@ -144,7 +144,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
extensionClients.add(extensionClient);
}
final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(),
final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getExtensionsDirectory(), engineConfiguration.getWorkingDirectory(),
narClassLoaders, extensionClients);
final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;

View File

@ -94,14 +94,14 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final DataflowDefinition<?> dataflowDefinition;
private final StatelessStateManagerProvider stateManagerProvider;
private final ObjectMapper objectMapper = new ObjectMapper();
private final ProcessScheduler processScheduler;
private volatile ExecutorService runDataflowExecutor;
private volatile ProcessScheduler processScheduler;
private volatile boolean initialized = false;
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition<?> dataflowDefinition,
final StatelessStateManagerProvider stateManagerProvider) {
final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();
this.reportingTasks = reportingTasks;
@ -110,6 +110,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
this.repositoryContextFactory = repositoryContextFactory;
this.dataflowDefinition = dataflowDefinition;
this.stateManagerProvider = stateManagerProvider;
this.processScheduler = processScheduler;
rootConnectables = new HashSet<>();
@ -164,19 +165,24 @@ public class StandardStatelessFlow implements StatelessDataflow {
}
}
public void initialize(final ProcessScheduler processScheduler) {
@Override
public void initialize() {
if (initialized) {
throw new IllegalStateException("Cannot initialize dataflow more than once");
logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", this);
return;
}
initialized = true;
this.processScheduler = processScheduler;
// Trigger validation to occur so that components can be enabled/started.
final long validationStart = System.currentTimeMillis();
performValidation();
final StatelessDataflowValidation validationResult = performValidation();
final long validationMillis = System.currentTimeMillis() - validationStart;
if (!validationResult.isValid()) {
logger.warn("{} Attempting to initialize dataflow but found at least one invalid component: {}", this, validationResult);
}
// Enable Controller Services and start processors in the flow.
// This is different than the calling ProcessGroup.startProcessing() because
// that method triggers the behavior to happen in the background and provides no way of knowing
@ -256,7 +262,9 @@ public class StandardStatelessFlow implements StatelessDataflow {
@Override
public void shutdown() {
if (runDataflowExecutor != null) {
runDataflowExecutor.shutdown();
}
rootGroup.stopProcessing();
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@ -274,10 +282,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
// invoke any methods annotated with @OnShutdown on Reporting Tasks
reportingTasks.forEach(processScheduler::shutdownReportingTask);
if (processScheduler != null) {
processScheduler.shutdown();
}
repositoryContextFactory.shutdown();
}

View File

@ -74,6 +74,11 @@ public class TestPropertiesFileFlowDefinitionParser {
return null;
}
@Override
public File getExtensionsDirectory() {
return null;
}
@Override
public File getKrb5File() {
return null;

View File

@ -81,6 +81,11 @@ public class StatelessSystemIT {
return new File("target/nifi-stateless-assembly/nars");
}
@Override
public File getExtensionsDirectory() {
return new File("target/nifi-stateless-assembly/extensions");
}
@Override
public File getKrb5File() {
return new File("/etc/krb5.conf");