NIFI-12158 MockProcessSession write methods preserves attributes (#7828)

Co-authored-by: Eric Secules <eric.secules@macrohealth.com>
This commit is contained in:
Eric Secules 2023-10-03 11:40:31 -07:00 committed by GitHub
parent 3f7b1de6b8
commit 721628eb95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 7 deletions

View File

@ -919,16 +919,15 @@ public class MockProcessSession implements ProcessSession {
if (!(flowFile instanceof MockFlowFile)) { if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create"); throw new IllegalArgumentException("Cannot export a flow file that I did not create");
} }
final MockFlowFile mockFlowFile = validateState(flowFile); final MockFlowFile mockFlowFile = validateState(flowFile);
writeRecursionSet.add(flowFile); writeRecursionSet.add(mockFlowFile);
final ByteArrayOutputStream baos = new ByteArrayOutputStream() { final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
super.close(); super.close();
writeRecursionSet.remove(mockFlowFile); writeRecursionSet.remove(mockFlowFile);
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), mockFlowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile); currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setData(toByteArray()); newFlowFile.setData(toByteArray());
@ -961,12 +960,12 @@ public class MockProcessSession implements ProcessSession {
} }
@Override @Override
public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) { public MockFlowFile write(FlowFile flowFile, final StreamCallback callback) {
flowFile = validateState(flowFile);
if (callback == null || flowFile == null) { if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null"); throw new IllegalArgumentException("argument cannot be null");
} }
final MockFlowFile mock = validateState(flowFile); final MockFlowFile mock = (MockFlowFile) flowFile;
final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData()); final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
final ByteArrayOutputStream out = new ByteArrayOutputStream(); final ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -979,7 +978,7 @@ public class MockProcessSession implements ProcessSession {
writeRecursionSet.remove(flowFile); writeRecursionSet.remove(flowFile);
} }
final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); final MockFlowFile newFlowFile = new MockFlowFile(flowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile); currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setData(out.toByteArray()); newFlowFile.setData(out.toByteArray());

View File

@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -133,6 +134,20 @@ public class TestMockProcessSession {
assertFalse(ff1.isPenalized()); assertFalse(ff1.isPenalized());
} }
@Test
public void testAttributePreservedAfterWrite() throws IOException {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
session.putAttribute(ff1, "key1", "val1");
session.write(ff1).close();
session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
session.commitAsync();
List<MockFlowFile> output = session.getFlowFilesForRelationship(PoorlyBehavedProcessor.REL_FAILURE);
assertEquals(1, output.size());
output.get(0).assertAttributeEquals("key1", "val1");
}
protected static class PoorlyBehavedProcessor extends AbstractProcessor { protected static class PoorlyBehavedProcessor extends AbstractProcessor {
private static final Relationship REL_FAILURE = new Relationship.Builder() private static final Relationship REL_FAILURE = new Relationship.Builder()