NIFI-5055 added ability to unpenalize MockFlowFile directly or from MockProcessSession

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Mark Bean 2018-04-08 15:03:09 -04:00 committed by Mike Thomsen
parent eb3391f1d2
commit 274ed46d92
3 changed files with 27 additions and 5 deletions

View File

@ -98,8 +98,8 @@ public class MockFlowFile implements FlowFileRecord {
this.penalized = toCopy.isPenalized(); this.penalized = toCopy.isPenalized();
} }
void setPenalized() { void setPenalized(boolean penalized) {
this.penalized = true; this.penalized = penalized;
} }
public long getCreationTime() { public long getCreationTime() {

View File

@ -1298,11 +1298,20 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile); currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setPenalized(); newFlowFile.setPenalized(true);
penalized.add(newFlowFile); penalized.add(newFlowFile);
return newFlowFile; return newFlowFile;
} }
public MockFlowFile unpenalize(FlowFile flowFile) {
flowFile = validateState(flowFile);
final MockFlowFile newFlowFile = new MockFlowFile(flowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setPenalized(false);
penalized.remove(newFlowFile);
return newFlowFile;
}
public byte[] getContentAsByteArray(MockFlowFile flowFile) { public byte[] getContentAsByteArray(MockFlowFile flowFile) {
flowFile = validateState(flowFile); flowFile = validateState(flowFile);
return flowFile.getData(); return flowFile.getData();

View File

@ -17,6 +17,8 @@
package org.apache.nifi.util; package org.apache.nifi.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -95,10 +97,21 @@ public class TestMockProcessSession {
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor); final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
ff1 = session.penalize(ff1); ff1 = session.penalize(ff1);
assertEquals(true, ff1.isPenalized()); assertTrue(ff1.isPenalized());
ff1 = session.putAttribute(ff1, "hello", "world"); ff1 = session.putAttribute(ff1, "hello", "world");
// adding attribute to flow file should not override the original penalized status // adding attribute to flow file should not override the original penalized status
assertEquals(true, ff1.isPenalized()); assertTrue(ff1.isPenalized());
}
@Test
public void testUnpenalizeFlowFile() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
ff1 = session.penalize(ff1);
assertTrue(ff1.isPenalized());
ff1 = session.unpenalize(ff1);
assertFalse(ff1.isPenalized());
} }
protected static class PoorlyBehavedProcessor extends AbstractProcessor { protected static class PoorlyBehavedProcessor extends AbstractProcessor {