NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless

NIFI-9390: Addressed underlying condition in stateless framework that caused Merge-related processors and similar to not properly be triggered as necessary. Added several system tests to verify different configurations.

NIFI-9390: Simplified the logic for how to iterate over the components in a Stateless flow that are ready to be triggered

This closes #5634.

Co-authored-by: Peter Turcsanyi <turcsanyi@apache.org>
Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Payne 2022-01-04 14:27:22 -05:00 committed by Peter Turcsanyi
parent 990285ba1c
commit 72e54f4fab
19 changed files with 478 additions and 62 deletions

View File

@ -188,7 +188,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
return;
}
final int binsMigrated = migrateBins(context);
final int binsMigrated = migrateBins(context, flowFilesBinned == 0);
final int binsProcessed = processBins(context);
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
@ -196,9 +196,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
}
private int migrateBins(final ProcessContext context) {
private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint) {
int added = 0;
for (final Bin bin : binManager.removeReadyBins(true)) {
for (final Bin bin : binManager.removeReadyBins(relaxFullnessConstraint)) {
this.readyBins.add(bin);
added++;
}

View File

@ -323,12 +323,15 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
}
}
boolean flowFilePolled = false;
while (isScheduled()) {
final ProcessSession session = sessionFactory.createSession();
final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
if (flowFiles.isEmpty()) {
break;
}
flowFilePolled = true;
if (getLogger().isDebugEnabled()) {
final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
@ -373,15 +376,20 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
// Complete any bins that meet their minimum size requirements
try {
manager.completeFullEnoughBins();
if (flowFilePolled) {
// At least one new FlowFile was pulled in. Only complete the bins that are entirely full
manager.completeFullBins();
} else {
// No FlowFiles available. Complete any bins that meet their minimum size requirements
manager.completeFullEnoughBins();
getLogger().debug("No more FlowFiles to bin; will yield");
context.yield();
}
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
getLogger().debug("No more FlowFiles to bin; will yield");
context.yield();
}
}

View File

@ -245,6 +245,10 @@ public class RecordBinManager {
return handleCompletedBins(RecordBin::isFullEnough, "Bin is full enough");
}
public int completeFullBins() throws IOException {
return handleCompletedBins(RecordBin::isFull, "Bin is completely full");
}
private int handleCompletedBins(final Predicate<RecordBin> completionTest, final String completionReason) throws IOException {
final Map<String, List<RecordBin>> completedBinMap = new HashMap<>();

View File

@ -485,7 +485,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -531,7 +531,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.FOOTER, "$");
createFlowFiles(runner);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -544,7 +544,7 @@ public class TestMergeContent {
}
@Test
public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException, InterruptedException {
public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
@ -552,7 +552,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.HEADER, "@");
createFlowFiles(runner);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -584,7 +584,7 @@ public class TestMergeContent {
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -654,7 +654,7 @@ public class TestMergeContent {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -733,7 +733,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
createFlowFiles(runner);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -770,7 +770,7 @@ public class TestMergeContent {
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -793,7 +793,7 @@ public class TestMergeContent {
runner.enqueue(", ".getBytes("UTF-8"), attributes);
attributes.put(CoreAttributes.FILENAME.key(), "AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName");
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@ -1108,7 +1108,7 @@ public class TestMergeContent {
attributes.put("attr", "b");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(1);
runner.run(2);
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
@ -1231,7 +1231,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
runner.run();
runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);

View File

@ -81,7 +81,7 @@ public class TestMergeRecord {
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");
runner.run(1);
runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
@ -110,6 +110,7 @@ public class TestMergeRecord {
runner.enqueue("Name, Age\nJane, 34");
runner.enqueue("Name, Color\nJohn, Blue");
runner.run(1, false, false);
runner.run(1, true, false);
runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
@ -339,7 +340,7 @@ public class TestMergeRecord {
}
runner.enqueue(sb.toString());
runner.run();
runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
@ -377,7 +378,7 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
runner.enqueue("Name, Age\nJohn, 35");
runner.run();
runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
}
@ -511,6 +512,7 @@ public class TestMergeRecord {
runner.run(1, false);
Thread.sleep(50L);
runner.run(1, false, false);
runner.run(1, true, false);
runner.assertTransferCount(MergeRecord.REL_MERGED, 0);

View File

@ -38,7 +38,7 @@ public class LogComponentStatuses implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(LogComponentStatuses.class);
private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent Repository holds 300 seconds' worth of metrics/events
private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s | %8$28.28s |\n";
private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %7$14.14s | %8$28.28s |\n";
private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$28.28s | %4$28.28s |\n";
private final FlowFileEventRepository flowFileEventRepository;

View File

@ -109,7 +109,7 @@ public class StandardExecutionProgress implements ExecutionProgress {
@Override
public boolean isDataQueued() {
for (final FlowFileQueue queue : internalFlowFileQueues) {
if (!queue.isActiveQueueEmpty()) {
if (!queue.isActiveQueueEmpty() || queue.isUnacknowledgedFlowFile()) {
return true;
}
}

View File

@ -139,7 +139,6 @@ public class StandardStatelessFlow implements StatelessDataflow {
internalFlowFileQueues = discoverInternalFlowFileQueues(rootGroup);
}
private List<FlowFileQueue> discoverInternalFlowFileQueues(final ProcessGroup group) {
final Set<Port> rootGroupInputPorts = rootGroup.getInputPorts();
final Set<Port> rootGroupOutputPorts = rootGroup.getOutputPorts();

View File

@ -19,6 +19,7 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.processor.ProcessContext;
@ -32,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -57,10 +57,6 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
this.processContextFactory = builder.processContextFactory;
}
public Connectable getCurrentComponent() {
return currentComponent;
}
@Override
public void triggerFlow() {
try {
@ -68,30 +64,31 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
while (!completionReached) {
triggerRootConnectables();
NextConnectable nextConnectable = NextConnectable.NEXT_READY;
while (tracker.isAnyReady() && nextConnectable == NextConnectable.NEXT_READY) {
final List<Connectable> next = tracker.getReady();
logger.debug("The following {} components are ready to be triggered: {}", next.size(), next);
while (tracker.isAnyReady()) {
final Connectable connectable = tracker.getNextReady();
logger.debug("The next ready component to be triggered: {}", connectable);
for (final Connectable connectable : next) {
nextConnectable = triggerWhileReady(connectable);
// Continually trigger the given component as long as it is ready to be triggered
final NextConnectable nextConnectable = triggerWhileReady(connectable);
// If there's nothing left to do, return
if (nextConnectable == NextConnectable.NONE) {
return;
}
// If next connectable is whatever is ready, just continue loop
if (nextConnectable == NextConnectable.NEXT_READY) {
continue;
}
// Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
break;
// If there's nothing left to do, return
if (nextConnectable == NextConnectable.NONE) {
return;
}
// If next connectable is whatever is ready, just continue loop
if (nextConnectable == NextConnectable.NEXT_READY) {
continue;
}
// Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
break;
}
completionReached = !tracker.isAnyReady();
// We have reached completion if the tracker does not know of any components ready to be triggered AND
// we have no data queued in the flow (with the exception of Output Ports).
completionReached = !tracker.isAnyReady() && isFlowQueueEmpty();
}
} catch (final Throwable t) {
if (t instanceof TerminatedTaskException) {
@ -106,6 +103,29 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
}
}
/**
* Returns <code>true</code> if all data in the flow has been fully processed. This includes both 'internal queues'
* that are available via the executionProgress, as well as considering any data that has been consumed from the queues by
* the 'rootConnectables' that has not yet completed processing
*
* @return <code>true</code> if all FlowFiles have completed processing and no data is available, <code>false</code> otherwise
*/
private boolean isFlowQueueEmpty() {
if (executionProgress.isDataQueued()) {
return false;
}
for (final Connectable rootConnectable : rootConnectables) {
for (final Connection connection : rootConnectable.getIncomingConnections()) {
if (connection.getFlowFileQueue().isUnacknowledgedFlowFile()) {
return false;
}
}
}
return true;
}
private void triggerRootConnectables() {
for (final Connectable connectable : rootConnectables) {
currentComponent = connectable;
@ -202,7 +222,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
SOURCE_CONNECTABLE,
NONE;
NONE
}
public static class Builder {

View File

@ -17,6 +17,14 @@
package org.apache.nifi.stateless.flow;
/**
* The StatelessFlowCurrent is responsible for facilitating the flow of data. Like a current of air, water, etc.
* is the body that moves, the StatelessFlowCurrent is the piece of the Stateless framework that deals with movement of data
*/
public interface StatelessFlowCurrent {
/**
* Triggers the dataflow, starting from 'root' or 'source' components all the way through the end of the dataflow until either
* the source components provide no data or all data that is provided is processed.
*/
void triggerFlow();
}

View File

@ -63,19 +63,44 @@ public class AsynchronousCommitTracker {
}
}
public List<Connectable> getReady() {
final List<Connectable> connectables = new ArrayList<>(ready);
Collections.reverse(connectables);
return connectables;
public Connectable getNextReady() {
if (ready.isEmpty()) {
return null;
}
Connectable last = null;
for (final Connectable connectable : ready) {
last = connectable;
}
return last;
}
public List<Connectable> getReady() {
final List<Connectable> reversed = new ArrayList<>(ready);
Collections.reverse(reversed);
return reversed;
}
/**
* Determines if there are any components that may be ready to be triggered. Note that a value of <code>true</code> may be returned, even if there are no components
* that currently are ready according to {@link #isReady(Connectable)}.
*
* @return <code>true</code> if any component is expected to be ready to trigger, <code>false</code> otherwise
*/
public boolean isAnyReady() {
final boolean anyReady = !ready.isEmpty();
logger.debug("{} Any components ready = {}, list={}", this, anyReady, ready);
return anyReady;
}
/**
* Checks if the given component is ready to be triggered and if not removes the component from the internal list of ready components
*
* @param connectable the components to check
* @return <code>true</code> if the component is ready to be triggered, <code>false</code> otherwise
*/
public boolean isReady(final Connectable connectable) {
if (!ready.contains(connectable)) {
logger.debug("{} {} is not ready because it's not in the list of ready components", this, connectable);
@ -94,7 +119,12 @@ public class AsynchronousCommitTracker {
return true;
}
logger.debug("{} {} is not ready because it has no data queued", this, connectable);
if (connectable.isTriggerWhenEmpty() && isDataHeld(connectable)) {
logger.debug("{} {} is ready because it is triggered when its input queue is empty and has unacknowledged data", this, connectable);
return true;
}
logger.debug("{} {} is not ready because it has no data queued or held (or has no data queued and is not to be triggered when input queue is empty)", this, connectable);
ready.remove(connectable);
return false;
}
@ -111,7 +141,22 @@ public class AsynchronousCommitTracker {
private boolean isDataQueued(final Connectable connectable) {
for (final Connection incoming : connectable.getIncomingConnections()) {
if (!incoming.getFlowFileQueue().isEmpty()) {
if (!incoming.getFlowFileQueue().isActiveQueueEmpty()) {
return true;
}
}
return false;
}
/**
* Determines if data is currently being held by the given connectable (i.e., it has at least one incoming Connection with unacknowledged FlowFiles)
* @param connectable the connectable to check
* @return <code>true</code> if the Connectable is holding onto data, <code>false</code> otherwise
*/
private boolean isDataHeld(final Connectable connectable) {
for (final Connection incoming : connectable.getIncomingConnections()) {
if (incoming.getFlowFileQueue().isUnacknowledgedFlowFile()) {
return true;
}
}

View File

@ -44,20 +44,25 @@ public class TestAsynchronousCommitTracker {
tracker.addConnectable(connectable1);
assertEquals(Collections.singletonList(connectable1), tracker.getReady());
assertEquals(connectable1, tracker.getNextReady());
tracker.addConnectable(connectable2);
assertEquals(Arrays.asList(connectable2, connectable1), tracker.getReady());
assertEquals(connectable2, tracker.getNextReady());
tracker.addConnectable(connectable3);
assertEquals(Arrays.asList(connectable3, connectable2, connectable1), tracker.getReady());
assertEquals(connectable3, tracker.getNextReady());
// connectable1 should now be moved to the start of the List
tracker.addConnectable(connectable1);
assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
assertEquals(connectable1, tracker.getNextReady());
// Adding connectable1 again should now have effect since it is already first
tracker.addConnectable(connectable1);
assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
assertEquals(connectable1, tracker.getNextReady());
}
@Test
@ -86,7 +91,7 @@ public class TestAsynchronousCommitTracker {
final Connection connection = Mockito.mock(Connection.class);
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.when(flowFileQueue.isEmpty()).thenReturn(false);
Mockito.when(flowFileQueue.isActiveQueueEmpty()).thenReturn(false);
Mockito.when(connectable2.getIncomingConnections()).thenReturn(Collections.singletonList(connection));
assertTrue(tracker.isReady(connectable2));
@ -95,7 +100,7 @@ public class TestAsynchronousCommitTracker {
// If we then indicate that the FlowFileQueue is empty, we should see that Connectable2 is no longer ready and it should be evicted from the collection of ready components.
// This should then also result in isAnyReady() being false.
Mockito.when(flowFileQueue.isEmpty()).thenReturn(true);
Mockito.when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
assertFalse(tracker.isReady(connectable2));
assertFalse(tracker.isAnyReady());
assertEquals(Collections.emptyList(), tracker.getReady());

View File

@ -17,9 +17,9 @@
package org.apache.nifi.stateless.basics;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;

View File

@ -17,10 +17,11 @@
package org.apache.nifi.stateless.basics;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
@ -28,11 +29,15 @@ import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -45,6 +50,130 @@ import static org.junit.Assert.assertTrue;
public class RequiresAdditionalInputIT extends StatelessSystemIT {
@Test
public void testMergeAsFirstProcessor() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort inPort = flowBuilder.createInputPort("In");
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
flowBuilder.createConnection(inPort, merge, Relationship.ANONYMOUS.getName());
flowBuilder.createConnection(merge, outPort, "merged");
// Startup the dataflow
final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
// Enqueue data and trigger
for (int i=1; i <= 3; i++) {
dataflow.enqueue(String.valueOf(i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
}
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
final List<FlowFile> flowFiles = result.getOutputFlowFiles("Out");
Assert.assertEquals(1, flowFiles.size());
final FlowFile first = flowFiles.get(0);
final String outputContent = new String(result.readContentAsByteArray(first));
Assert.assertEquals("123", outputContent);
result.acknowledge();
}
@Test
public void testMergeAsFirstProcessorWithoutEnoughData() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort inPort = flowBuilder.createInputPort("In");
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
merge.setProperties(Collections.singletonMap("Minimum Number of Entries", "100"));
merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
flowBuilder.createConnection(inPort, merge, Relationship.ANONYMOUS.getName());
flowBuilder.createConnection(merge, outPort, "merged");
// Startup the dataflow
final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
// Enqueue data and trigger
for (int i=1; i <= 3; i++) {
dataflow.enqueue(String.valueOf(i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
}
final DataflowTrigger trigger = dataflow.trigger();
final Optional<TriggerResult> resultOption = trigger.getResult(2, TimeUnit.SECONDS);
// We expect this to timeout
assertFalse(resultOption.isPresent());
trigger.cancel();
}
@Test
public void testMergeDownstream() throws IOException, StatelessConfigurationException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedPort inPort = flowBuilder.createInputPort("In");
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
final VersionedProcessor firstUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
final Map<String, String> firstUpdateProperties = new HashMap<>();
firstUpdateProperties.put("Content", "\n1");
firstUpdateProperties.put("Update Strategy", "Append");
firstUpdate.setProperties(firstUpdateProperties);
final VersionedProcessor secondUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
final Map<String, String> secondUpdateProperties = new HashMap<>();
secondUpdateProperties.put("Content", "\n2");
secondUpdateProperties.put("Update Strategy", "Append");
secondUpdate.setProperties(secondUpdateProperties);
final VersionedProcessor thirdUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
final Map<String, String> thirdUpdateProperties = new HashMap<>();
thirdUpdateProperties.put("Content", "\n3");
thirdUpdateProperties.put("Update Strategy", "Append");
thirdUpdate.setProperties(thirdUpdateProperties);
final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
flowBuilder.createConnection(inPort, firstUpdate, Relationship.ANONYMOUS.getName());
flowBuilder.createConnection(firstUpdate, secondUpdate, "success");
flowBuilder.createConnection(secondUpdate, thirdUpdate, "success");
flowBuilder.createConnection(thirdUpdate, merge, "success");
flowBuilder.createConnection(merge, outPort, "merged");
// Startup the dataflow
final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
// Enqueue data and trigger
for (int i=1; i <= 3; i++) {
dataflow.enqueue(("hello " + i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
}
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
final List<FlowFile> out = result.getOutputFlowFiles("Out");
assertEquals(1, out.size());
final byte[] outputContent = result.readContentAsByteArray(out.get(0));
final String outputText = new String(outputContent, StandardCharsets.UTF_8);
final StringBuilder expectedContentBuilder = new StringBuilder();
for (int i=1; i <= 3; i++) {
expectedContentBuilder.append("hello ").append(i).append("\n1\n2\n3");
}
final String expectedContent = expectedContentBuilder.toString();
assertEquals(expectedContent, outputText);
}
@Test
public void testSourceProcessorsTriggeredAsOftenAsRequired() throws IOException, StatelessConfigurationException, InterruptedException {
// Build the flow

View File

@ -39,6 +39,11 @@
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
@ -36,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
@TriggerWhenEmpty
public class ConcatenateFlowFiles extends AbstractProcessor {
static final PropertyDescriptor FLOWFILE_COUNT = new Builder()
.name("FlowFile Count")

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinFiles;
import org.apache.nifi.processor.util.bin.BinManager;
import org.apache.nifi.processor.util.bin.BinProcessingResult;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@TriggerSerially
@TriggerWhenEmpty
public class ConcatenateRangeOfFlowFiles extends BinFiles {
public static final Relationship REL_MERGED = new Relationship.Builder()
.name("merged")
.description("The FlowFile containing the merged content")
.build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
relationships.add(REL_MERGED);
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(MIN_ENTRIES);
descriptors.add(MAX_ENTRIES);
descriptors.add(MIN_SIZE);
descriptors.add(MAX_SIZE);
descriptors.add(MAX_BIN_AGE);
descriptors.add(MAX_BIN_COUNT);
return descriptors;
}
@Override
protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
return flowFile;
}
@Override
protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
return null;
}
@Override
protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
}
@Override
protected BinProcessingResult processBin(final Bin bin, final ProcessContext context) throws ProcessException {
final ProcessSession session = bin.getSession();
final List<FlowFile> flowFiles = bin.getContents();
FlowFile merged = session.create(flowFiles);
merged = session.merge(flowFiles, merged);
session.transfer(merged, REL_MERGED);
getLogger().info("Concatenated {} FlowFiles into {}", flowFiles.size(), merged);
final BinProcessingResult binProcessingResult = new BinProcessingResult(true);
binProcessingResult.setCommitted(false);
return binProcessingResult;
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class UpdateContent extends AbstractProcessor {
static final PropertyDescriptor CONTENT = new Builder()
.name("Content")
.displayName("Content")
.description("Content to set")
.required(true)
.addValidator(Validator.VALID)
.defaultValue("Default Content")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor UPDATE_STRATEGY = new Builder()
.name("Update Strategy")
.displayName("Update Strategy")
.description("How to update the contents")
.required(true)
.allowableValues("Replace", "Append")
.defaultValue("Replace")
.build();
private final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(CONTENT, UPDATE_STRATEGY);
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String content = context.getProperty(CONTENT).evaluateAttributeExpressions(flowFile).getValue();
final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
final String strategy = context.getProperty(UPDATE_STRATEGY).getValue();
if (strategy.equalsIgnoreCase("Replace")) {
session.write(flowFile, out -> out.write(contentBytes));
} else {
session.append(flowFile, out -> out.write(contentBytes));
}
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -16,6 +16,7 @@
org.apache.nifi.processors.tests.system.CountEvents
org.apache.nifi.processors.tests.system.CountFlowFiles
org.apache.nifi.processors.tests.system.ConcatenateFlowFiles
org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles
org.apache.nifi.processors.tests.system.DependOnProperties
org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
org.apache.nifi.processors.tests.system.Duplicate
@ -38,6 +39,7 @@ org.apache.nifi.processors.tests.system.SplitByLine
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.TransferBatch
org.apache.nifi.processors.tests.system.ThrowProcessException
org.apache.nifi.processors.tests.system.UpdateContent
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile