diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 09cc1d3a5f..c04bab70fd 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -68,6 +68,7 @@ public class MockProcessSession implements ProcessSession { private final Map> transferMap = new ConcurrentHashMap<>(); private final MockFlowFileQueue processorQueue; private final Set beingProcessed = new HashSet<>(); + private final Set created = new HashSet<>(); private final List penalized = new ArrayList<>(); private final Processor processor; @@ -212,6 +213,10 @@ public class MockProcessSession implements ProcessSession { if (removedFlowFiles.remove(flowFile.getId())) { newOwner.removedFlowFiles.add(flowFile.getId()); } + + if (created.remove(flowFile.getId())) { + newOwner.created.add(flowFile.getId()); + } } final Set flowFileIds = flowFiles.stream() @@ -225,8 +230,7 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile clone(FlowFile flowFile) { flowFile = validateState(flowFile); final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -241,8 +245,7 @@ public class MockProcessSession implements ProcessSession { final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size)); newFlowFile.setData(newContent); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -289,6 +292,7 @@ public class MockProcessSession implements ProcessSession { beingProcessed.clear(); currentVersions.clear(); originalVersions.clear(); + created.clear(); for (final Map.Entry entry : counterMap.entrySet()) { sharedState.adjustCounter(entry.getKey(), entry.getValue()); @@ -338,8 +342,7 @@ public class MockProcessSession implements ProcessSession { @Override public MockFlowFile create() { final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId()); - currentVersions.put(flowFile.getId(), flowFile); - beingProcessed.add(flowFile.getId()); + updateStateWithNewFlowFile(flowFile); return flowFile; } @@ -347,8 +350,7 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile create(final FlowFile flowFile) { MockFlowFile newFlowFile = create(); newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -356,8 +358,7 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile create(final Collection flowFiles) { MockFlowFile newFlowFile = create(); newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -794,9 +795,11 @@ public class MockProcessSession implements ProcessSession { for (final List list : transferMap.values()) { for (final MockFlowFile flowFile : list) { - processorQueue.offer(flowFile); - if (penalize) { - penalized.add(flowFile); + if (!created.contains(flowFile.getId())) { + processorQueue.offer(flowFile); + if (penalize) { + penalized.add(flowFile); + } } } } @@ -804,9 +807,11 @@ public class MockProcessSession implements ProcessSession { for (final Long flowFileId : beingProcessed) { final MockFlowFile flowFile = originalVersions.get(flowFileId); if (flowFile != null) { - processorQueue.offer(flowFile); - if (penalize) { - penalized.add(flowFile); + if (!created.contains(flowFile.getId())) { + processorQueue.offer(flowFile); + if (penalize) { + penalized.add(flowFile); + } } } } @@ -816,6 +821,7 @@ public class MockProcessSession implements ProcessSession { currentVersions.clear(); originalVersions.clear(); transferMap.clear(); + created.clear(); clearTransferState(); if (!penalize) { penalized.clear(); @@ -850,6 +856,15 @@ public class MockProcessSession implements ProcessSession { mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet()); } + private void updateStateWithNewFlowFile(MockFlowFile newFlowFile) { + if (newFlowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + created.add(newFlowFile.getId()); + } + @Override public void transfer(final Collection flowFiles) { for (final FlowFile flowFile : flowFiles) { @@ -1064,6 +1079,7 @@ public class MockProcessSession implements ProcessSession { final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); newFlowFile.setData(baos.toByteArray()); currentVersions.put(newFlowFile.getId(), newFlowFile); + created.add(newFlowFile.getId()); return newFlowFile; } diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index 775bc2f5ed..700d0d29b2 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -134,6 +134,40 @@ public class TestMockProcessSession { assertFalse(ff1.isPenalized()); } + @Test + public void testRollbackWithCreatedFlowFile() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE); + session.rollback(); + session.assertQueueEmpty(); + } + + @Test + public void testRollbackWithClonedFlowFile() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + session.clone(ff1); + session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE); + session.rollback(); + session.assertQueueEmpty(); + } + + @Test + public void testRollbackWithMigratedFlowFile() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final MockProcessSession newSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + session.migrate(newSession); + newSession.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE); + newSession.rollback(); + session.assertQueueEmpty(); + newSession.assertQueueEmpty(); + } + @Test public void testAttributePreservedAfterWrite() throws IOException { final Processor processor = new PoorlyBehavedProcessor();