NIFI-8041: Refactored API for stateless nifi so that calling StatelessDataflow.trigger() returns an object that allows the caller to wait for the result, cancel the result, etc. and then optionally acknowledge the completion. This allows the caller to block the completion of ProcessSession.commit() until it has handled the output of the dataflow execution.

NIFI-8038: Fixed deadlock that can occur when updating Parameter Context

This closes #4684.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2020-11-23 14:15:23 -05:00 committed by Bryan Bende
parent 3c9d8a7007
commit 7ad9520079
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
21 changed files with 965 additions and 187 deletions

View File

@ -103,13 +103,14 @@ public class StandardParameterContext implements ParameterContext {
}
public void setParameters(final Map<String, Parameter> updatedParameters) {
final Map<String, ParameterUpdate> parameterUpdates = new HashMap<>();
boolean changeAffectingComponents = false;
writeLock.lock();
try {
this.version++;
verifyCanSetParameters(updatedParameters);
final Map<String, ParameterUpdate> parameterUpdates = new HashMap<>();
boolean changeAffectingComponents = false;
for (final Map.Entry<String, Parameter> entry : updatedParameters.entrySet()) {
final String parameterName = entry.getKey();
final Parameter parameter = entry.getValue();
@ -133,23 +134,23 @@ public class StandardParameterContext implements ParameterContext {
}
}
}
if (changeAffectingComponents) {
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). Notifying all affected components.", this, parameterUpdates.size(), parameterUpdates);
for (final ProcessGroup processGroup : parameterReferenceManager.getProcessGroupsBound(this)) {
try {
processGroup.onParameterContextUpdated(parameterUpdates);
} catch (final Exception e) {
logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e);
}
}
} else {
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). No existing components are affected.", this, parameterUpdates.size(), parameterUpdates);
}
} finally {
writeLock.unlock();
}
if (changeAffectingComponents) {
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). Notifying all affected components.", this, parameterUpdates.size(), parameterUpdates);
for (final ProcessGroup processGroup : parameterReferenceManager.getProcessGroupsBound(this)) {
try {
processGroup.onParameterContextUpdated(parameterUpdates);
} catch (final Exception e) {
logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e);
}
}
} else {
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). No existing components are affected.", this, parameterUpdates.size(), parameterUpdates);
}
}
/**

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.nifi.processor.exception.ProcessException;
public class DataflowAbortedException extends ProcessException {
public DataflowAbortedException() {
super();
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.flow;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
public interface DataflowTrigger {
/**
* Cancels the triggering of the dataflow. If the dataflow has not yet completed, any actions left to perform
* will not be completed. The session will be rolled back instead of committed.
*/
void cancel();
/**
* Returns the results of triggering the dataflow immediately, if they are available, else returns an empty Optional
* @return the results of triggering the dataflow immediately, if they are available, else returns an empty Optional
*/
Optional<TriggerResult> getResultNow();
/**
* Waits up to the specified amount of time for the result to be come available. If, after that time, the result is not
* available, returns an empty Optional. Otherwise, returns the results as soon as they become available.
*
* @param maxWaitTime the maximum amount of time to wait
* @param timeUnit the time unit that the max wait time is associated with
* @return the results of triggering the dataflow, or an empty Optional if the results are not available within the given amount of time
* @throws InterruptedException if interrupted while waiting for the results
*/
Optional<TriggerResult> getResult(long maxWaitTime, TimeUnit timeUnit) throws InterruptedException;
/**
* Returns the results of triggering the dataflow, waiting as long as necessary for the results to become available.
* @return the results of triggering the dataflow
* @throws InterruptedException if interrupted while waiting for the results
*/
TriggerResult getResult() throws InterruptedException;
}

View File

@ -16,31 +16,23 @@
*/
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface StatelessDataflow {
void trigger();
DataflowTrigger trigger();
void shutdown();
StatelessDataflowValidation performValidation();
Map<String, List<FlowFile>> drainOutputQueues();
List<FlowFile> drainOutputQueues(String portName);
Set<String> getInputPortNames();
Set<String> getOutputPortNames();
void enqueue(byte[] flowFileContents, Map<String, String> attributes, String portName);
byte[] getFlowFileContents(FlowFile flowFile);
int getFlowFilesQueued();
long getBytesQueued();

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface TriggerResult {
/**
* @return <code>true</code> if the dataflow completed successfully, <code>false</code> if the dataflow failed to run to completion successfully
*/
boolean isSuccessful();
/**
* @return <code>true</code> if the dataflow execution was canceled, <code>false</code> otherwise
*/
boolean isCanceled();
/**
* If the dataflow failed to run to completion, returns the Exception that caused the failure
* @return the Exception that caused the dataflow to fail, or an empty Optional if there was no Exception thrown
*/
Optional<Exception> getFailureCause();
/**
* @return a mapping of Output Port Name to all FlowFiles that were transferred to that Output Port
*/
Map<String, List<FlowFile>> getOutputFlowFiles();
/**
* Returns a List of all FlowFiles that were transferred to the Output Port with the given name
* @param portName the name of the Output Port
* @return a List all FlowFiles that were transferred to the Output Port. Will return an empty list if no FlowFiles transferred.
*/
List<FlowFile> getOutputFlowFiles(String portName);
/**
* Provides the contents of a FlowFile that was obtained by calling {@link #getOutputFlowFiles()}.
* @param flowFile the FlowFile whose contents are to be read
* @return the contents of the FlowFile
*/
byte[] readContent(FlowFile flowFile);
/**
* Acknowledges the output of the dataflow and allows the session to be successfully committed.
*/
void acknowledge();
}

View File

@ -22,8 +22,10 @@ import org.apache.nifi.stateless.config.PropertiesFileEngineConfigurationParser;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.StatelessDataflowValidation;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,7 +37,7 @@ import java.util.concurrent.TimeUnit;
public class RunStatelessFlow {
private static final Logger logger = LoggerFactory.getLogger(RunStatelessFlow.class);
public static void main(final String[] args) throws IOException, StatelessConfigurationException {
public static void main(final String[] args) throws IOException, StatelessConfigurationException, InterruptedException {
if (!BootstrapConfiguration.isValid(args)) {
BootstrapConfiguration.printUsage();
return;
@ -58,25 +60,30 @@ public class RunStatelessFlow {
}
}
private static void triggerContinuously(final StatelessDataflow dataflow) {
private static void triggerContinuously(final StatelessDataflow dataflow) throws InterruptedException {
while (true) {
try {
final long triggerStart = System.nanoTime();
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
result.acknowledge();
final long triggerNanos = System.nanoTime() - triggerStart;
logger.debug("Ran dataflow in {} nanoseconds", triggerNanos);
dataflow.drainOutputQueues();
} catch (final InterruptedException ie) {
throw ie;
} catch (final Exception e) {
logger.error("Failed to run dataflow", e);
}
}
}
private static void triggerOnce(final StatelessDataflow dataflow) {
private static void triggerOnce(final StatelessDataflow dataflow) throws InterruptedException {
final long triggerStart = System.nanoTime();
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
result.acknowledge();
final long triggerNanos = System.nanoTime() - triggerStart;
final long triggerMillis = TimeUnit.NANOSECONDS.toMillis(triggerNanos);

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
/**
* The ExecutionProgress functions as a bridge between the caller of the dataflow trigger
* and the dataflow engine. It is used to allow the caller to cancel the dataflow, wait for its completion, and to convey
* to the engine that the user has canceled the dataflow, or determine whether or not all relevant data has been processed
*/
public interface ExecutionProgress {
/**
* @return <code>true</code> if the execution has been canceled, <code>false</code> otherwise
*/
boolean isCanceled();
/**
* @return <code>true</code> if there is data queued up to be processed, <code>false</code> if all data has been removed from the flow
* or queued at Terminal Output Ports
*/
boolean isDataQueued();
/**
* Returns the Completion Action that should be taken when the dataflow has completed, blocking as long as necessary for the determination to be made
* @return the CompletionAction that should be taken
*/
CompletionAction awaitCompletionAction() throws InterruptedException;
/**
* Notifies the ExecutionProgress that processing has been canceled
*/
void notifyExecutionCanceled();
/**
* Notifies the ExecutionProgress that processing has failed
*/
void notifyExecutionFailed(Throwable cause);
enum CompletionAction {
COMPLETE,
CANCEL;
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class StandardExecutionProgress implements ExecutionProgress {
private static final Logger logger = LoggerFactory.getLogger(StandardExecutionProgress.class);
private final ProcessGroup rootGroup;
private final List<FlowFileQueue> internalFlowFileQueues;
private final ByteArrayContentRepository contentRepository;
private final BlockingQueue<TriggerResult> resultQueue;
private final Set<String> failurePortNames;
private final BlockingQueue<CompletionAction> completionActionQueue;
private volatile boolean canceled = false;
private volatile CompletionAction completionAction = null;
public StandardExecutionProgress(final ProcessGroup rootGroup, final List<FlowFileQueue> internalFlowFileQueues, final BlockingQueue<TriggerResult> resultQueue,
final ByteArrayContentRepository contentRepository, final Set<String> failurePortNames) {
this.rootGroup = rootGroup;
this.internalFlowFileQueues = internalFlowFileQueues;
this.resultQueue = resultQueue;
this.contentRepository = contentRepository;
this.failurePortNames = failurePortNames;
completionActionQueue = new LinkedBlockingQueue<>();
}
@Override
public boolean isCanceled() {
return canceled;
}
@Override
public boolean isDataQueued() {
for (final FlowFileQueue queue : internalFlowFileQueues) {
if (!queue.isActiveQueueEmpty()) {
return true;
}
}
return false;
}
@Override
public CompletionAction awaitCompletionAction() throws InterruptedException {
if (canceled) {
return CompletionAction.CANCEL;
}
final CompletionAction existingAction = this.completionAction;
if (existingAction != null) {
return existingAction;
}
final TriggerResult triggerResult = createResult();
resultQueue.offer(triggerResult);
final CompletionAction completionAction = completionActionQueue.take();
// Hold onto the result so that the other Process Sessions that call this method can retrieve the result.
this.completionAction = completionAction;
return completionAction;
}
private TriggerResult createResult() {
final Map<String, List<FlowFile>> outputFlowFiles = drainOutputQueues();
for (final String failurePortName : failurePortNames) {
final List<FlowFile> flowFilesForPort = outputFlowFiles.get(failurePortName);
if (flowFilesForPort != null && !flowFilesForPort.isEmpty()) {
throw new FailurePortEncounteredException("FlowFile was transferred to Port " + failurePortName + ", which is marked as a Failure Port");
}
}
final boolean canceled = isCanceled();
return new TriggerResult() {
@Override
public boolean isSuccessful() {
return true;
}
@Override
public boolean isCanceled() {
return canceled;
}
@Override
public Optional<Exception> getFailureCause() {
return Optional.empty();
}
@Override
public Map<String, List<FlowFile>> getOutputFlowFiles() {
return outputFlowFiles;
}
@Override
public List<FlowFile> getOutputFlowFiles(final String portName) {
return outputFlowFiles.computeIfAbsent(portName, name -> Collections.emptyList());
}
@Override
public byte[] readContent(final FlowFile flowFile) {
if (!(flowFile instanceof FlowFileRecord)) {
throw new IllegalArgumentException("FlowFile was not created by this flow");
}
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final ContentClaim contentClaim = flowFileRecord.getContentClaim();
return contentRepository.getBytes(contentClaim);
}
@Override
public void acknowledge() {
completionActionQueue.offer(CompletionAction.COMPLETE);
}
};
}
@Override
public void notifyExecutionCanceled() {
canceled = true;
completionActionQueue.offer(CompletionAction.CANCEL);
}
@Override
public void notifyExecutionFailed(final Throwable cause) {
completionActionQueue.offer(CompletionAction.CANCEL);
}
public Map<String, List<FlowFile>> drainOutputQueues() {
final Map<String, List<FlowFile>> flowFileMap = new HashMap<>();
for (final Port port : rootGroup.getOutputPorts()) {
final List<FlowFile> flowFiles = drainOutputQueues(port);
flowFileMap.put(port.getName(), flowFiles);
}
return flowFileMap;
}
private List<FlowFile> drainOutputQueues(final Port port) {
final List<Connection> incomingConnections = port.getIncomingConnections();
if (incomingConnections.isEmpty()) {
return Collections.emptyList();
}
final List<FlowFile> portFlowFiles = new ArrayList<>();
for (final Connection connection : incomingConnections) {
final DrainableFlowFileQueue flowFileQueue = (DrainableFlowFileQueue) connection.getFlowFileQueue();
final List<FlowFileRecord> flowFileRecords = new ArrayList<>(flowFileQueue.size().getObjectCount());
flowFileQueue.drainTo(flowFileRecords);
portFlowFiles.addAll(flowFileRecords);
for (final FlowFileRecord flowFileRecord : flowFileRecords) {
contentRepository.decrementClaimantCount(flowFileRecord.getContentClaim());
}
}
return portFlowFiles;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class CanceledTriggerResult implements TriggerResult {
@Override
public boolean isSuccessful() {
return false;
}
@Override
public boolean isCanceled() {
return true;
}
@Override
public Optional<Exception> getFailureCause() {
return Optional.empty();
}
@Override
public Map<String, List<FlowFile>> getOutputFlowFiles() {
return Collections.emptyMap();
}
@Override
public List<FlowFile> getOutputFlowFiles(final String portName) {
return Collections.emptyList();
}
@Override
public byte[] readContent(final FlowFile flowFile) {
return new byte[0];
}
@Override
public void acknowledge() {
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class ExceptionalTriggerResult implements TriggerResult {
private final Exception failureCause;
public ExceptionalTriggerResult(final Exception failureCause) {
this.failureCause = failureCause;
}
@Override
public boolean isSuccessful() {
return false;
}
@Override
public boolean isCanceled() {
return failureCause instanceof TerminatedTaskException;
}
@Override
public Optional<Exception> getFailureCause() {
return Optional.ofNullable(failureCause);
}
@Override
public Map<String, List<FlowFile>> getOutputFlowFiles() {
return Collections.emptyMap();
}
@Override
public List<FlowFile> getOutputFlowFiles(final String portName) {
return Collections.emptyList();
}
@Override
public byte[] readContent(final FlowFile flowFile) {
throw new IllegalArgumentException("Unknown FlowFile: " + flowFile);
}
@Override
public void acknowledge() {
}
}

View File

@ -29,11 +29,8 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@ -44,10 +41,11 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.engine.StandardExecutionProgress;
import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.StatelessProcessSessionFactory;
@ -58,14 +56,19 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class StandardStatelessFlow implements StatelessDataflow {
@ -78,13 +81,15 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final ControllerServiceProvider controllerServiceProvider;
private final ProcessContextFactory processContextFactory;
private final RepositoryContextFactory repositoryContextFactory;
private final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition;
private final List<FlowFileQueue> internalFlowFileQueues;
private final DataflowDefinition<?> dataflowDefinition;
private volatile ExecutorService runDataflowExecutor;
private volatile ProcessScheduler processScheduler;
private volatile boolean initialized = false;
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition<?> dataflowDefinition) {
this.rootGroup = rootGroup;
this.reportingTasks = reportingTasks;
this.controllerServiceProvider = controllerServiceProvider;
@ -97,6 +102,21 @@ public class StandardStatelessFlow implements StatelessDataflow {
discoverRootProcessors(rootGroup, rootConnectables);
discoverRootRemoteGroupPorts(rootGroup, rootConnectables);
discoverRootInputPorts(rootGroup, rootConnectables);
internalFlowFileQueues = discoverInternalFlowFileQueues(rootGroup);
}
private List<FlowFileQueue> discoverInternalFlowFileQueues(final ProcessGroup group) {
final Set<Port> rootGroupInputPorts = rootGroup.getInputPorts();
final Set<Port> rootGroupOutputPorts = rootGroup.getOutputPorts();
//noinspection SuspiciousMethodCalls
return group.findAllConnections().stream()
.filter(connection -> !rootGroupInputPorts.contains(connection.getSource()))
.filter(connection -> !rootGroupOutputPorts.contains(connection.getDestination()))
.map(Connection::getFlowFileQueue)
.distinct()
.collect(Collectors.toCollection(ArrayList::new));
}
private void discoverRootInputPorts(final ProcessGroup processGroup, final Set<Connectable> rootComponents) {
@ -130,6 +150,11 @@ public class StandardStatelessFlow implements StatelessDataflow {
}
public void initialize(final ProcessScheduler processScheduler) {
if (initialized) {
throw new IllegalStateException("Cannot initialize dataflow more than once");
}
initialized = true;
this.processScheduler = processScheduler;
// Trigger validation to occur so that components can be enabled/started.
@ -162,6 +187,12 @@ public class StandardStatelessFlow implements StatelessDataflow {
logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)",
initializationMillis, validationMillis, serviceEnableMillis);
runDataflowExecutor = Executors.newFixedThreadPool(1, r -> {
final Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("Run Dataflow");
return thread;
});
} catch (final Throwable t) {
processScheduler.shutdown();
throw t;
@ -214,6 +245,8 @@ public class StandardStatelessFlow implements StatelessDataflow {
@Override
public void shutdown() {
runDataflowExecutor.shutdown();
if (processScheduler != null) {
processScheduler.shutdown();
}
@ -308,35 +341,81 @@ public class StandardStatelessFlow implements StatelessDataflow {
}
@Override
public void trigger() {
for (final Connectable connectable : rootConnectables) {
final ProcessContext processContext = processContextFactory.createProcessContext(connectable);
final StatelessProcessSessionFactory sessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory,
processContextFactory, dataflowDefinition.getFailurePortNames());
public DataflowTrigger trigger() {
if (!initialized) {
throw new IllegalStateException("Must initialize dataflow before triggering it");
}
final long start = System.nanoTime();
final long processingNanos;
int invocations = 0;
final BlockingQueue<TriggerResult> resultQueue = new LinkedBlockingQueue<>();
final ExecutionProgress executionProgress = new StandardExecutionProgress(rootGroup, internalFlowFileQueues, resultQueue,
(ByteArrayContentRepository) repositoryContextFactory.getContentRepository(), dataflowDefinition.getFailurePortNames());
final AtomicReference<Future<?>> processFuture = new AtomicReference<>();
final DataflowTrigger trigger = new DataflowTrigger() {
@Override
public void cancel() {
executionProgress.notifyExecutionCanceled();
final Future<?> future = processFuture.get();
if (future != null) {
future.cancel(true);
}
}
@Override
public Optional<TriggerResult> getResultNow() {
final TriggerResult result = resultQueue.poll();
return Optional.ofNullable(result);
}
@Override
public Optional<TriggerResult> getResult(final long maxWaitTime, final TimeUnit timeUnit) throws InterruptedException {
final TriggerResult result = resultQueue.poll(maxWaitTime, timeUnit);
return Optional.ofNullable(result);
}
@Override
public TriggerResult getResult() throws InterruptedException {
final TriggerResult result = resultQueue.take();
return result;
}
};
final Future<?> future = runDataflowExecutor.submit(() -> executeDataflow(resultQueue, executionProgress));
processFuture.set(future);
return trigger;
}
private void executeDataflow(final BlockingQueue<TriggerResult> resultQueue, final ExecutionProgress executionProgress) {
try {
for (final Connectable connectable : rootConnectables) {
final ProcessContext processContext = processContextFactory.createProcessContext(connectable);
final StatelessProcessSessionFactory sessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory,
processContextFactory, executionProgress);
final long start = System.nanoTime();
final long processingNanos;
int invocations = 0;
final List<Connection> incommingConnections = connectable.getIncomingConnections();
if (incommingConnections.isEmpty()) {
// If there is no incoming connection, trigger once.
logger.debug("Triggering {}", connectable);
connectable.onTrigger(processContext, sessionFactory);
invocations = 1;
} else {
// If there is an incoming connection, trigger until all incoming connections are empty.
for (final Connection incomingConnection : incommingConnections) {
while (!incomingConnection.getFlowFileQueue().isEmpty()) {
logger.debug("Triggering {}", connectable);
connectable.onTrigger(processContext, sessionFactory);
invocations++;
}
}
}
processingNanos = System.nanoTime() - start;
registerProcessEvent(connectable, invocations, processingNanos);
processingNanos = System.nanoTime() - start;
registerProcessEvent(connectable, invocations, processingNanos);
}
} catch (final TerminatedTaskException tte) {
// This occurs when the caller invokes the cancel() method of DataflowTrigger.
logger.debug("Caught a TerminatedTaskException", tte);
resultQueue.offer(new CanceledTriggerResult());
} catch (final Exception e) {
logger.error("Failed to execute dataflow", e);
resultQueue.offer(new ExceptionalTriggerResult(e));
}
}
@ -351,47 +430,6 @@ public class StandardStatelessFlow implements StatelessDataflow {
}
}
public Map<String, List<FlowFile>> drainOutputQueues() {
final Map<String, List<FlowFile>> flowFileMap = new HashMap<>();
for (final Port port : rootGroup.getOutputPorts()) {
final List<FlowFile> flowFiles = drainOutputQueues(port);
flowFileMap.put(port.getName(), flowFiles);
}
return flowFileMap;
}
@Override
public List<FlowFile> drainOutputQueues(final String portName) {
final Port port = rootGroup.getOutputPortByName(portName);
if (port == null) {
throw new IllegalArgumentException("No Output Port exists with name <" + portName + ">. Valid Port names are " + getOutputPortNames());
}
return drainOutputQueues(port);
}
private List<FlowFile> drainOutputQueues(final Port port) {
final List<Connection> incomingConnections = port.getIncomingConnections();
if (incomingConnections.isEmpty()) {
return Collections.emptyList();
}
final List<FlowFile> portFlowFiles = new ArrayList<>();
for (final Connection connection : incomingConnections) {
final DrainableFlowFileQueue flowFileQueue = (DrainableFlowFileQueue) connection.getFlowFileQueue();
final List<FlowFileRecord> flowFileRecords = new ArrayList<>(flowFileQueue.size().getObjectCount());
flowFileQueue.drainTo(flowFileRecords);
portFlowFiles.addAll(flowFileRecords);
for (final FlowFileRecord flowFileRecord : flowFileRecords) {
repositoryContextFactory.getContentRepository().decrementClaimantCount(flowFileRecord.getContentClaim());
}
}
return portFlowFiles;
}
@Override
public Set<String> getInputPortNames() {
@ -429,18 +467,6 @@ public class StandardStatelessFlow implements StatelessDataflow {
}
}
@Override
public byte[] getFlowFileContents(final FlowFile flowFile) {
if (!(flowFile instanceof FlowFileRecord)) {
throw new IllegalArgumentException("FlowFile was not created by this flow");
}
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final ContentClaim contentClaim = flowFileRecord.getContentClaim();
final ContentRepository contentRepository = repositoryContextFactory.getContentRepository();
return ((ByteArrayContentRepository) contentRepository).getBytes(contentClaim);
}
@Override
public int getFlowFilesQueued() {
return rootGroup.findAllConnections().stream()

View File

@ -27,8 +27,9 @@ import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.stateless.engine.DataflowAbortedException;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class StatelessProcessSession extends StandardProcessSession {
private static final Logger logger = LoggerFactory.getLogger(StatelessProcessSession.class);
@ -45,49 +45,45 @@ public class StatelessProcessSession extends StandardProcessSession {
private final RepositoryContext context;
private final StatelessProcessSessionFactory sessionFactory;
private final ProcessContextFactory processContextFactory;
private final Set<String> failurePortNames;
private final ExecutionProgress executionProgress;
public StatelessProcessSession(final RepositoryContext context, final StatelessProcessSessionFactory sessionFactory, final ProcessContextFactory processContextFactory,
final Set<String> failurePortNames) {
super(context, () -> false);
final ExecutionProgress progress) {
super(context, progress::isCanceled);
this.context = context;
this.sessionFactory = sessionFactory;
this.processContextFactory = processContextFactory;
this.failurePortNames = failurePortNames;
this.executionProgress = progress;
}
@Override
protected void commit(final StandardProcessSession.Checkpoint checkpoint) {
// If task has been canceled, abort processing and throw an Exception, rather than committing the session.
if (executionProgress.isCanceled()) {
logger.info("Completed processing for {} but execution has been canceled. Will not commit session.", context.getConnectable());
abortProcessing(null);
throw new DataflowAbortedException();
}
// Commit the session
super.commit(checkpoint);
// Trigger each of the follow-on components.
final long followOnStart = System.nanoTime();
for (final Connection connection : context.getConnectable().getConnections()) {
// This component may have produced multiple output FlowFiles. We want to trigger the follow-on components
// until they have consumed all created FlowFiles.
while (!connection.getFlowFileQueue().isEmpty()) {
final Connectable connectable = connection.getDestination();
if (isTerminalPort(connectable)) {
if (failurePortNames.contains(connectable.getName())) {
abortProcessing();
throw new FailurePortEncounteredException("Flow failed because a FlowFile was routed from " + connection.getSource() + " to Failure Port " + connection.getDestination());
}
// If data is being transferred to a terminal port, we don't want to trigger the port,
// as it has nowhere to transfer the data. We simply leave it queued at the terminal port.
// Once the processing completes, the terminal ports' connections will be drained, when #awaitAcknowledgment is called.
break;
}
final ProcessContext connectableContext = processContextFactory.createProcessContext(connectable);
final ProcessSessionFactory connectableSessionFactory = new StatelessProcessSessionFactory(connectable, this.sessionFactory.getRepositoryContextFactory(),
processContextFactory, failurePortNames);
logger.debug("Triggering {}", connectable);
final long start = System.nanoTime();
try {
connectable.onTrigger(connectableContext, connectableSessionFactory);
} catch (final Throwable t) {
abortProcessing();
throw t;
}
final long nanos = System.nanoTime() - start;
registerProcessEvent(connectable, nanos);
// Trigger the next component
triggerNext(connectable);
}
}
@ -100,9 +96,65 @@ public class StatelessProcessSession extends StandardProcessSession {
// and it's probably the best that we can do without either introducing a very ugly hack or significantly changing the API.
final long followOnNanos = System.nanoTime() - followOnStart;
registerProcessEvent(context.getConnectable(), -followOnNanos);
// Wait for acknowledgement if necessary
awaitAcknowledgment();
}
private void abortProcessing() {
private void triggerNext(final Connectable connectable) {
if (executionProgress.isCanceled()) {
logger.info("Completed processing for {} but execution has been canceled. Will not commit session.", context.getConnectable());
abortProcessing(null);
throw new DataflowAbortedException();
}
final ProcessContext connectableContext = processContextFactory.createProcessContext(connectable);
final ProcessSessionFactory connectableSessionFactory = new StatelessProcessSessionFactory(connectable, this.sessionFactory.getRepositoryContextFactory(),
processContextFactory, executionProgress);
logger.debug("Triggering {}", connectable);
final long start = System.nanoTime();
try {
connectable.onTrigger(connectableContext, connectableSessionFactory);
} catch (final Throwable t) {
abortProcessing(t);
throw t;
}
final long nanos = System.nanoTime() - start;
registerProcessEvent(connectable, nanos);
}
private void awaitAcknowledgment() {
if (executionProgress.isDataQueued()) {
logger.debug("Completed processing for {} but data is queued for processing so will allow Process Session to complete without waiting for acknowledgment", context.getConnectable());
return;
}
logger.debug("Completed processing for {}; no data is queued for processing so will await acknowledgment of completion", context.getConnectable());
final ExecutionProgress.CompletionAction completionAction;
try {
completionAction = executionProgress.awaitCompletionAction();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for dataflow completion to be acknowledged. Will roll back session.");
abortProcessing(e);
throw new DataflowAbortedException();
}
if (completionAction == ExecutionProgress.CompletionAction.CANCEL) {
logger.info("Dataflow completed but action was canceled instead of being acknowledged. Will roll back session.");
abortProcessing(null);
throw new DataflowAbortedException();
}
}
private void abortProcessing(final Throwable cause) {
if (cause == null) {
executionProgress.notifyExecutionCanceled();
} else {
executionProgress.notifyExecutionFailed(cause);
}
try {
rollback(false, true);
} finally {

View File

@ -21,29 +21,28 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import java.util.Set;
public class StatelessProcessSessionFactory implements ProcessSessionFactory {
private final Connectable connectable;
private final RepositoryContextFactory contextFactory;
private final ProcessContextFactory processContextFactory;
private final Set<String> failurePortNames;
private final ExecutionProgress executionProgress;
public StatelessProcessSessionFactory(final Connectable connectable, final RepositoryContextFactory contextFactory, final ProcessContextFactory processContextFactory,
final Set<String> failurePortNames) {
final ExecutionProgress executionProgress) {
this.connectable = connectable;
this.contextFactory = contextFactory;
this.processContextFactory = processContextFactory;
this.failurePortNames = failurePortNames;
this.executionProgress = executionProgress;
}
@Override
public ProcessSession createSession() {
final RepositoryContext context = contextFactory.createRepositoryContext(connectable);
final ProcessSession session = new StatelessProcessSession(context, this, processContextFactory, failurePortNames);
final ProcessSession session = new StatelessProcessSession(context, this, processContextFactory, executionProgress);
return session;
}

View File

@ -31,6 +31,9 @@ import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import java.io.File;
import java.io.FileInputStream;
@ -40,13 +43,22 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class StatelessSystemIT {
private final List<StatelessDataflow> createdFlows = new ArrayList<>();
// We reference version 1.13.0 here, but the version isn't really relevant. Because there will only be a single artifact of name "nifi-system-test-extensions-nar" the framework will end
// up finding a "compatible bundle" and using that, regardless of the specified version.
protected static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "1.13.0");
@Rule
public TestName name = new TestName();
@Rule
public Timeout defaultTimeout = new Timeout(30, TimeUnit.SECONDS);
@Before
public void clearFlows() {
createdFlows.clear();
@ -142,4 +154,8 @@ public class StatelessSystemIT {
createdFlows.add(dataflow);
return dataflow;
}
protected String getTestName() {
return name.getMethodName();
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.basics;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CancelFlowExecutionIT extends StatelessSystemIT {
@Test
public void testCancelExecution() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort inPort = flowBuilder.createInputPort("In");
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor sleep = flowBuilder.createSimpleProcessor("Sleep");
sleep.setProperties(Collections.singletonMap("onTrigger Sleep Time", "5 mins"));
flowBuilder.createConnection(inPort, sleep, Relationship.ANONYMOUS.getName());
flowBuilder.createConnection(sleep, outPort, "success");
final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot());
final DataflowTrigger trigger = dataflow.trigger();
// Wait up to 250 milliseconds. At that point, the result should not be available.
final Optional<TriggerResult> optionalResult = trigger.getResult(250, TimeUnit.MILLISECONDS);
assertFalse(optionalResult.isPresent());
trigger.cancel();
final TriggerResult result = trigger.getResult();
assertFalse(result.isSuccessful());
assertTrue(result.isCanceled());
assertTrue(result.getOutputFlowFiles().isEmpty());
assertEquals(0, dataflow.getFlowFilesQueued());
}
}

View File

@ -24,7 +24,9 @@ import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
@ -33,11 +35,12 @@ import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CloneFlowFileIT extends StatelessSystemIT {
@Test
public void testClone() throws IOException, StatelessConfigurationException {
public void testClone() throws IOException, StatelessConfigurationException, InterruptedException {
// Build the flow
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort inPort = flowBuilder.createInputPort("In");
@ -59,9 +62,12 @@ public class CloneFlowFileIT extends StatelessSystemIT {
// Enqueue data and trigger
dataflow.enqueue("Hello".getBytes(StandardCharsets.UTF_8), Collections.singletonMap("abc", "123"), "In");
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
result.acknowledge();
final List<FlowFile> flowFiles = dataflow.drainOutputQueues("Out");
final List<FlowFile> flowFiles = result.getOutputFlowFiles("Out");
assertEquals(2, flowFiles.size());
final FlowFile first = flowFiles.get(0);
@ -71,11 +77,11 @@ public class CloneFlowFileIT extends StatelessSystemIT {
assertEquals("123", second.getAttribute("abc"));
final long countNormal = flowFiles.stream()
.filter(flowFile -> new String(dataflow.getFlowFileContents(flowFile), StandardCharsets.UTF_8).equals("Hello"))
.filter(flowFile -> new String(result.readContent(flowFile), StandardCharsets.UTF_8).equals("Hello"))
.count();
final long countReversed = flowFiles.stream()
.filter(flowFile -> new String(dataflow.getFlowFileContents(flowFile), StandardCharsets.UTF_8).equals("olleH"))
.filter(flowFile -> new String(result.readContent(flowFile), StandardCharsets.UTF_8).equals("olleH"))
.count();
assertEquals(1L, countNormal);

View File

@ -18,9 +18,14 @@
package org.apache.nifi.stateless.basics;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.File;
@ -30,23 +35,54 @@ import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CreatesFlowFileIT extends StatelessSystemIT {
@Test
public void testFlowFileCreated() throws IOException, StatelessConfigurationException {
public void testFlowFileCreated() throws IOException, StatelessConfigurationException, InterruptedException {
final StatelessDataflow dataflow = loadDataflow(new File("src/test/resources/flows/GenerateFlowFile.json"), Collections.emptyList());
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
assertEquals(Collections.singleton("Out"), dataflow.getOutputPortNames());
final List<FlowFile> flowFiles = dataflow.drainOutputQueues("Out");
final List<FlowFile> flowFiles = result.getOutputFlowFiles("Out");
assertEquals(1, flowFiles.size());
final FlowFile flowFile = flowFiles.get(0);
assertEquals("hello", flowFile.getAttribute("greeting"));
final byte[] bytes = dataflow.getFlowFileContents(flowFile);
final byte[] bytes = result.readContent(flowFile);
assertEquals("Hello", new String(bytes, StandardCharsets.UTF_8));
}
@Test
public void testMultipleFlowFilesCreated() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder builder = new VersionedFlowBuilder();
final VersionedProcessor generate = builder.createSimpleProcessor("GenerateFlowFile");
generate.setProperties(Collections.singletonMap("Batch Size", "500"));
final VersionedProcessor setAttribute = builder.createSimpleProcessor("SetAttribute");
builder.createConnection(generate, setAttribute, "success");
final VersionedPort outPort = builder.createOutputPort("Out");
builder.createConnection(setAttribute, outPort, "success");
final StatelessDataflow dataflow = loadDataflow(builder.getFlowSnapshot());
for (int i=0; i < 10; i++) {
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
final List<FlowFile> output = result.getOutputFlowFiles("Out");
assertEquals(500, output.size());
result.acknowledge();
// Wait for the number of FlowFiles queued to be equal to 0. It may take a few milliseconds.
while (dataflow.getFlowFilesQueued() > 0) {
Thread.sleep(5);
}
}
}
}

View File

@ -25,7 +25,9 @@ import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
@ -33,11 +35,12 @@ import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class InputOutputIT extends StatelessSystemIT {
@Test
public void testFlowFileInputProcessedAndOutputProvided() throws IOException, StatelessConfigurationException {
public void testFlowFileInputProcessedAndOutputProvided() throws IOException, StatelessConfigurationException, InterruptedException {
// Build flow
final VersionedFlowSnapshot versionedFlowSnapshot = createFlow();
@ -46,10 +49,12 @@ public class InputOutputIT extends StatelessSystemIT {
// Enqueue data and trigger
dataflow.enqueue(new byte[0], Collections.singletonMap("abc", "123"), "In");
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
// Validate results
final List<FlowFile> outputFlowFiles = dataflow.drainOutputQueues("Out");
final List<FlowFile> outputFlowFiles = result.getOutputFlowFiles("Out");
assertEquals(1, outputFlowFiles.size());
final FlowFile output = outputFlowFiles.get(0);
@ -58,7 +63,7 @@ public class InputOutputIT extends StatelessSystemIT {
}
@Test
public void testMultipleFlowFilesIn() throws IOException, StatelessConfigurationException {
public void testMultipleFlowFilesIn() throws IOException, StatelessConfigurationException, InterruptedException {
// Build flow
final VersionedFlowSnapshot versionedFlowSnapshot = createFlow();
@ -68,17 +73,38 @@ public class InputOutputIT extends StatelessSystemIT {
// Enqueue data and trigger
dataflow.enqueue(new byte[0], Collections.singletonMap("abc", "123"), "In");
dataflow.enqueue(new byte[0], Collections.singletonMap("abc", "321"), "In");
dataflow.trigger();
// Validate results
final List<FlowFile> outputFlowFiles = dataflow.drainOutputQueues("Out");
assertEquals(2, outputFlowFiles.size());
DataflowTrigger trigger = dataflow.trigger();
TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
// Triggering once will only process 1 of the FlowFiles and leave the other input FlowFile queued.
result.acknowledge();
// It may take a few milliseconds for the acknowledgement to result in the FlowFiles being acknowledged by the FlowFile Queue.
while (dataflow.getFlowFilesQueued() > 1) {
Thread.sleep(10L);
}
assertEquals(1, dataflow.getFlowFilesQueued());
// Validate results of first run
List<FlowFile> outputFlowFiles = result.getOutputFlowFiles("Out");
final FlowFile output1 = outputFlowFiles.get(0);
assertEquals("123", output1.getAttribute("abc"));
assertEquals("bar", output1.getAttribute("foo"));
final FlowFile output2 = outputFlowFiles.get(1);
trigger = dataflow.trigger();
result = trigger.getResult();
assertTrue(result.isSuccessful());
result.acknowledge();
// Validate results of second run
outputFlowFiles = result.getOutputFlowFiles("Out");
assertEquals(1, outputFlowFiles.size());
final FlowFile output2 = outputFlowFiles.get(0);
assertEquals("321", output2.getAttribute("abc"));
assertEquals("bar", output2.getAttribute("foo"));
}

View File

@ -23,21 +23,24 @@ import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.junit.Assert;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class RollbackOnExceptionIT extends StatelessSystemIT {
private static final String EXCEPTION_TEXT = "Intentional Exception to verify behavior in RollbackOnExceptionIT";
@Test
public void testFlowFileCompletelyRemovedWhenExceptionThrown() throws IOException, StatelessConfigurationException {
public void testFlowFileCompletelyRemovedWhenExceptionThrown() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder builder = new VersionedFlowBuilder();
final VersionedProcessor generate = builder.createSimpleProcessor("GenerateFlowFile");
final VersionedProcessor setAttribute = builder.createSimpleProcessor("SetAttribute");
@ -49,19 +52,17 @@ public class RollbackOnExceptionIT extends StatelessSystemIT {
builder.createConnection(setAttribute, throwException, "success");
final StatelessDataflow dataflow = loadDataflow(builder.getFlowSnapshot(), Collections.emptyList());
try {
dataflow.trigger();
Assert.fail("Expected ProcessException to be thrown");
} catch (final ProcessException e) {
assertEquals(EXCEPTION_TEXT, e.getMessage());
}
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertFalse(result.isSuccessful());
assertTrue(result.getFailureCause().get() instanceof ProcessException);
assertEquals(0, dataflow.getFlowFilesQueued());
}
@Test
public void testFlowFileCompletelyRemovedWhenTransferredToFailurePort() throws IOException, StatelessConfigurationException {
public void testFlowFileCompletelyRemovedWhenTransferredToFailurePort() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder builder = new VersionedFlowBuilder();
final VersionedProcessor generate = builder.createSimpleProcessor("GenerateFlowFile");
final VersionedProcessor setAttribute = builder.createSimpleProcessor("SetAttribute");
@ -72,12 +73,10 @@ public class RollbackOnExceptionIT extends StatelessSystemIT {
final StatelessDataflow dataflow = loadDataflow(builder.getFlowSnapshot(), Collections.emptyList(), Collections.singleton("Out"));
try {
dataflow.trigger();
Assert.fail("Expected FailurePortEncounteredException to be thrown");
} catch (final FailurePortEncounteredException e) {
// Expected
}
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertFalse(result.isSuccessful());
assertTrue(result.getFailureCause().get() instanceof FailurePortEncounteredException);
assertEquals(0, dataflow.getFlowFilesQueued());
}

View File

@ -26,7 +26,9 @@ import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
@ -34,11 +36,12 @@ import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StatelessControllerServiceSystemIT extends StatelessSystemIT {
@Test
public void testControllerServices() throws IOException, StatelessConfigurationException {
public void testControllerServices() throws IOException, StatelessConfigurationException, InterruptedException {
// Build the flow:
// Root Input Port -> Child Input Port -> CountFlowFiles -> Child Output Port -> Root Output Port.
// Controller Service at root group and at child group.
@ -69,9 +72,11 @@ public class StatelessControllerServiceSystemIT extends StatelessSystemIT {
// Enqueue FlowFile and trigger
dataflow.enqueue(new byte[0], Collections.emptyMap(), "Root In");
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
final List<FlowFile> flowFilesOut = dataflow.drainOutputQueues("Root Out");
final List<FlowFile> flowFilesOut = result.getOutputFlowFiles("Root Out");
assertEquals(1, flowFilesOut.size());
// Should be 2 because both the child Count Service and the Root-level Count Service got triggered

View File

@ -30,7 +30,9 @@ import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterDefinition;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
@ -41,11 +43,12 @@ import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ParameterContextIT extends StatelessSystemIT {
@Test
public void testMultipleParameterContexts() throws IOException, StatelessConfigurationException {
public void testMultipleParameterContexts() throws IOException, StatelessConfigurationException, InterruptedException {
// Build dataflow
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort inPort = flowBuilder.createInputPort("In");
@ -87,10 +90,12 @@ public class ParameterContextIT extends StatelessSystemIT {
// Enqueue data and trigger
dataflow.enqueue(new byte[0], Collections.singletonMap("abc", "123"), "In");
dataflow.trigger();
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
// Validate results
final List<FlowFile> outputFlowFiles = dataflow.drainOutputQueues("Out");
final List<FlowFile> outputFlowFiles = result.getOutputFlowFiles("Out");
assertEquals(1, outputFlowFiles.size());
final FlowFile output = outputFlowFiles.get(0);