[NIFI-12067] mock process session keeps track of flowfiles created during the session and removes them on rollback rather than putting them on the input queue (#7827)

rename method to be more descriptive, fix checkstyle error for trailing whitespace in TestMockProcessSession.java
added session.transfer call to unit tests so that they fail without the fixes.

Co-authored-by: Eric Secules <eric.secules@macrohealth.com>
This commit is contained in:
Eric Secules 2023-10-04 07:48:50 -07:00 committed by GitHub
parent da4c6f6e25
commit 746dad7f46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 16 deletions

View File

@ -68,6 +68,7 @@ public class MockProcessSession implements ProcessSession {
private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>(); private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
private final MockFlowFileQueue processorQueue; private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>(); private final Set<Long> beingProcessed = new HashSet<>();
private final Set<Long> created = new HashSet<>();
private final List<MockFlowFile> penalized = new ArrayList<>(); private final List<MockFlowFile> penalized = new ArrayList<>();
private final Processor processor; private final Processor processor;
@ -212,6 +213,10 @@ public class MockProcessSession implements ProcessSession {
if (removedFlowFiles.remove(flowFile.getId())) { if (removedFlowFiles.remove(flowFile.getId())) {
newOwner.removedFlowFiles.add(flowFile.getId()); newOwner.removedFlowFiles.add(flowFile.getId());
} }
if (created.remove(flowFile.getId())) {
newOwner.created.add(flowFile.getId());
}
} }
final Set<String> flowFileIds = flowFiles.stream() final Set<String> flowFileIds = flowFiles.stream()
@ -225,8 +230,7 @@ public class MockProcessSession implements ProcessSession {
public MockFlowFile clone(FlowFile flowFile) { public MockFlowFile clone(FlowFile flowFile) {
flowFile = validateState(flowFile); flowFile = validateState(flowFile);
final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile); updateStateWithNewFlowFile(newFlowFile);
beingProcessed.add(newFlowFile.getId());
return newFlowFile; return newFlowFile;
} }
@ -241,8 +245,7 @@ public class MockProcessSession implements ProcessSession {
final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size)); final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size));
newFlowFile.setData(newContent); newFlowFile.setData(newContent);
currentVersions.put(newFlowFile.getId(), newFlowFile); updateStateWithNewFlowFile(newFlowFile);
beingProcessed.add(newFlowFile.getId());
return newFlowFile; return newFlowFile;
} }
@ -289,6 +292,7 @@ public class MockProcessSession implements ProcessSession {
beingProcessed.clear(); beingProcessed.clear();
currentVersions.clear(); currentVersions.clear();
originalVersions.clear(); originalVersions.clear();
created.clear();
for (final Map.Entry<String, Long> entry : counterMap.entrySet()) { for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
sharedState.adjustCounter(entry.getKey(), entry.getValue()); sharedState.adjustCounter(entry.getKey(), entry.getValue());
@ -338,8 +342,7 @@ public class MockProcessSession implements ProcessSession {
@Override @Override
public MockFlowFile create() { public MockFlowFile create() {
final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId()); final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId());
currentVersions.put(flowFile.getId(), flowFile); updateStateWithNewFlowFile(flowFile);
beingProcessed.add(flowFile.getId());
return flowFile; return flowFile;
} }
@ -347,8 +350,7 @@ public class MockProcessSession implements ProcessSession {
public MockFlowFile create(final FlowFile flowFile) { public MockFlowFile create(final FlowFile flowFile) {
MockFlowFile newFlowFile = create(); MockFlowFile newFlowFile = create();
newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile); newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile); updateStateWithNewFlowFile(newFlowFile);
beingProcessed.add(newFlowFile.getId());
return newFlowFile; return newFlowFile;
} }
@ -356,8 +358,7 @@ public class MockProcessSession implements ProcessSession {
public MockFlowFile create(final Collection<FlowFile> flowFiles) { public MockFlowFile create(final Collection<FlowFile> flowFiles) {
MockFlowFile newFlowFile = create(); MockFlowFile newFlowFile = create();
newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile); newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile); updateStateWithNewFlowFile(newFlowFile);
beingProcessed.add(newFlowFile.getId());
return newFlowFile; return newFlowFile;
} }
@ -794,9 +795,11 @@ public class MockProcessSession implements ProcessSession {
for (final List<MockFlowFile> list : transferMap.values()) { for (final List<MockFlowFile> list : transferMap.values()) {
for (final MockFlowFile flowFile : list) { for (final MockFlowFile flowFile : list) {
processorQueue.offer(flowFile); if (!created.contains(flowFile.getId())) {
if (penalize) { processorQueue.offer(flowFile);
penalized.add(flowFile); if (penalize) {
penalized.add(flowFile);
}
} }
} }
} }
@ -804,9 +807,11 @@ public class MockProcessSession implements ProcessSession {
for (final Long flowFileId : beingProcessed) { for (final Long flowFileId : beingProcessed) {
final MockFlowFile flowFile = originalVersions.get(flowFileId); final MockFlowFile flowFile = originalVersions.get(flowFileId);
if (flowFile != null) { if (flowFile != null) {
processorQueue.offer(flowFile); if (!created.contains(flowFile.getId())) {
if (penalize) { processorQueue.offer(flowFile);
penalized.add(flowFile); if (penalize) {
penalized.add(flowFile);
}
} }
} }
} }
@ -816,6 +821,7 @@ public class MockProcessSession implements ProcessSession {
currentVersions.clear(); currentVersions.clear();
originalVersions.clear(); originalVersions.clear();
transferMap.clear(); transferMap.clear();
created.clear();
clearTransferState(); clearTransferState();
if (!penalize) { if (!penalize) {
penalized.clear(); penalized.clear();
@ -850,6 +856,15 @@ public class MockProcessSession implements ProcessSession {
mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet()); mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet());
} }
private void updateStateWithNewFlowFile(MockFlowFile newFlowFile) {
if (newFlowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
currentVersions.put(newFlowFile.getId(), newFlowFile);
beingProcessed.add(newFlowFile.getId());
created.add(newFlowFile.getId());
}
@Override @Override
public void transfer(final Collection<FlowFile> flowFiles) { public void transfer(final Collection<FlowFile> flowFiles) {
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
@ -1064,6 +1079,7 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
newFlowFile.setData(baos.toByteArray()); newFlowFile.setData(baos.toByteArray());
currentVersions.put(newFlowFile.getId(), newFlowFile); currentVersions.put(newFlowFile.getId(), newFlowFile);
created.add(newFlowFile.getId());
return newFlowFile; return newFlowFile;
} }

View File

@ -134,6 +134,40 @@ public class TestMockProcessSession {
assertFalse(ff1.isPenalized()); assertFalse(ff1.isPenalized());
} }
@Test
public void testRollbackWithCreatedFlowFile() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
session.rollback();
session.assertQueueEmpty();
}
@Test
public void testRollbackWithClonedFlowFile() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
session.clone(ff1);
session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
session.rollback();
session.assertQueueEmpty();
}
@Test
public void testRollbackWithMigratedFlowFile() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
final MockProcessSession newSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
session.migrate(newSession);
newSession.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
newSession.rollback();
session.assertQueueEmpty();
newSession.assertQueueEmpty();
}
@Test @Test
public void testAttributePreservedAfterWrite() throws IOException { public void testAttributePreservedAfterWrite() throws IOException {
final Processor processor = new PoorlyBehavedProcessor(); final Processor processor = new PoorlyBehavedProcessor();