NIFI-1423 Allow to penalize FlowFiles that are routed to No Retry relationship

This closes #183

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
ledor473 2016-01-21 17:15:34 -05:00 committed by jpercivall
parent 2673370cba
commit eb6f9f0fec
6 changed files with 141 additions and 8 deletions

View File

@ -61,6 +61,7 @@ public class MockProcessSession implements ProcessSession {
private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>();
private final List<MockFlowFile> penalized = new ArrayList<>();
private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
@ -429,11 +430,22 @@ public class MockProcessSession implements ProcessSession {
@Override
public void remove(final FlowFile flowFile) {
validateState(flowFile);
final Iterator<Long> itr = beingProcessed.iterator();
while (itr.hasNext()) {
final Long ffId = itr.next();
final Iterator<MockFlowFile> penalizedItr = penalized.iterator();
while (penalizedItr.hasNext()) {
final MockFlowFile ff = penalizedItr.next();
if (Objects.equals(ff.getId(), flowFile.getId())) {
penalizedItr.remove();
penalized.remove(ff);
break;
}
}
final Iterator<Long> processedItr = beingProcessed.iterator();
while (processedItr.hasNext()) {
final Long ffId = processedItr.next();
if (ffId != null && ffId.equals(flowFile.getId())) {
itr.remove();
processedItr.remove();
beingProcessed.remove(ffId);
removedCount++;
currentVersions.remove(ffId);
@ -522,6 +534,9 @@ public class MockProcessSession implements ProcessSession {
for (final List<MockFlowFile> list : transferMap.values()) {
for (final MockFlowFile flowFile : list) {
processorQueue.offer(flowFile);
if (penalize) {
penalized.add(flowFile);
}
}
}
@ -529,6 +544,9 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile flowFile = originalVersions.get(flowFileId);
if (flowFile != null) {
processorQueue.offer(flowFile);
if (penalize) {
penalized.add(flowFile);
}
}
}
@ -538,6 +556,9 @@ public class MockProcessSession implements ProcessSession {
originalVersions.clear();
transferMap.clear();
clearTransferState();
if (!penalize) {
penalized.clear();
}
}
@Override
@ -696,6 +717,10 @@ public class MockProcessSession implements ProcessSession {
return list;
}
public List<MockFlowFile> getPenalizedFlowFiles() {
return penalized;
}
/**
* @param relationship to get flowfiles for
* @return a List of FlowFiles in the order in which they were transferred
@ -1013,6 +1038,7 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setPenalized();
penalized.add(newFlowFile);
return newFlowFile;
}

View File

@ -335,6 +335,11 @@ public class StandardProcessorTestRunner implements TestRunner {
Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size());
}
@Override
public void assertPenalizeCount(final int count) {
Assert.assertEquals(count, getPenalizedFlowFiles().size());
}
@Override
public void assertValid() {
context.assertValid();
@ -453,6 +458,23 @@ public class StandardProcessorTestRunner implements TestRunner {
return flowFiles;
}
@Override
public List<MockFlowFile> getPenalizedFlowFiles() {
final List<MockFlowFile> flowFiles = new ArrayList<>();
for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
flowFiles.addAll(session.getPenalizedFlowFiles());
}
Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
@Override
public int compare(final MockFlowFile o1, final MockFlowFile o2) {
return Long.compare(o1.getCreationTime(), o2.getCreationTime());
}
});
return flowFiles;
}
/**
* @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted.
*/

View File

@ -290,6 +290,14 @@ public interface TestRunner {
*/
void assertTransferCount(String relationship, int count);
/**
* Assert that the number of FlowFiles that were penalized is equal to the given count
*
* @param count
* number of expected penalized
*/
void assertPenalizeCount(int count);
/**
* Assert that there are no FlowFiles left on the input queue.
*/
@ -437,6 +445,13 @@ public interface TestRunner {
*/
List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship);
/**
* Returns a List of FlowFiles in the order in which they were transferred that were penalized
*
* @return flowfiles that were penalized
*/
List<MockFlowFile> getPenalizedFlowFiles();
/**
* @return the {@link ProvenanceReporter} that will be used by the
* configured {@link Processor} for reporting Provenance Events

View File

@ -319,6 +319,14 @@ public final class InvokeHTTP extends AbstractProcessor {
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder()
.name("Penalize on \"No Retry\"")
.description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
@ -339,7 +347,8 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_TRUSTED_HOSTNAME,
PROP_ADD_HEADERS_TO_REQUEST,
PROP_CONTENT_TYPE,
PROP_USE_CHUNKED_ENCODING));
PROP_USE_CHUNKED_ENCODING,
PROP_PENALIZE_NO_RETRY));
// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
@ -844,6 +853,9 @@ public final class InvokeHTTP extends AbstractProcessor {
// 1xx, 3xx, 4xx -> NO RETRY
} else {
if (request != null) {
if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) {
request = session.penalize(request);
}
session.transfer(request, REL_NO_RETRY);
}
}

View File

@ -107,6 +107,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -153,6 +154,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)

View File

@ -114,6 +114,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -166,6 +167,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -204,6 +206,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -244,6 +247,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -285,6 +289,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -325,6 +330,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -362,6 +368,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+all attributes from response)
@ -401,6 +408,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code, status.message and body of response in attribute
// original flow file (+attributes)
@ -428,6 +436,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in response
// status code, status message, all headers from server response --> ff attributes
@ -455,6 +464,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION");
@ -465,6 +475,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
}
@Test
@ -483,6 +494,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request
// status code, status message, no ff content
@ -516,6 +528,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -572,6 +585,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@ -607,6 +621,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
@ -646,6 +661,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
@ -686,6 +702,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(1);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0);
@ -714,7 +731,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// getMyFlowFiles();
runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@ -741,7 +758,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// getMyFlowFiles();
runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@ -768,7 +785,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// getMyFlowFiles();
runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@ -782,6 +799,34 @@ public abstract class TestInvokeHttpCommon {
}
@Test
public void test400WithPenalizeNoRetry() throws Exception {
addHandler(new GetOrHeadHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400");
runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, "true");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(1);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request");
bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400");
final String expected = "Hello";
Assert.assertEquals(expected, actual);
bundle.assertAttributeEquals("Foo", "Bar");
}
@Test
public void test412() throws Exception {
addHandler(new GetOrHeadHandler());
@ -797,6 +842,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
@ -826,6 +872,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@ -859,6 +906,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@ -997,6 +1045,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@ -1031,6 +1080,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@ -1063,6 +1113,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@ -1094,6 +1145,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
@ -1130,6 +1182,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
runner.assertPenalizeCount(1);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
@ -1155,6 +1208,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
runner.assertPenalizeCount(1);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
@ -1179,6 +1233,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
runner.assertPenalizeCount(1);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
@ -1204,6 +1259,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)