NIFI-6385 Added signal.id penalization

- Add additional doc about best practices.

This closes #3540.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Koji Kawamura 2019-06-20 10:38:22 +09:00 committed by Mark Payne
parent f851ed4887
commit 31097c96d6
3 changed files with 361 additions and 0 deletions

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -43,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -212,6 +214,24 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder()
.name("wait-penalty-duration")
.displayName("Wait Penalty Duration")
.description("If configured, after a signal identifier got processed but did not meet the release criteria," +
" the signal identifier is penalized and FlowFiles having the signal identifier" +
" will not be processed again for the specified period of time," +
" so that the signal identifier will not block others to be processed." +
" This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers," +
" and each signal identifier has multiple FlowFiles," +
" and also the order of releasing FlowFiles is important within a signal identifier." +
" The FlowFile order can be configured with Prioritizers." +
" IMPORTANT: There is a limitation of number of queued signals can be processed," +
" and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile with a matching release signal in the cache will be routed to this relationship")
@ -234,6 +254,8 @@ public class Wait extends AbstractProcessor {
private final Set<Relationship> relationships;
private final Map<String, Long> signalIdPenalties = new HashMap<>();
public Wait() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@ -255,6 +277,7 @@ public class Wait extends AbstractProcessor {
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
descriptors.add(WAIT_MODE);
descriptors.add(WAIT_PENALTY_DURATION);
return descriptors;
}
@ -280,6 +303,19 @@ public class Wait extends AbstractProcessor {
final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
() -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;
// Clear expired penalties.
if (!signalIdPenalties.isEmpty()) {
final Iterator<Entry<String, Long>> penaltyIterator = signalIdPenalties.entrySet().iterator();
final long now = System.currentTimeMillis();
while (penaltyIterator.hasNext()) {
final Entry<String, Long> penalty = penaltyIterator.next();
if (penalty.getValue() < now) {
penaltyIterator.remove();
}
}
}
final List<FlowFile> flowFiles = session.get(f -> {
final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();
@ -292,6 +328,11 @@ public class Wait extends AbstractProcessor {
return ACCEPT_AND_CONTINUE;
}
if (signalIdPenalties.containsKey(fSignalId)) {
// This id is penalized.
return REJECT_AND_CONTINUE;
}
final String targetSignalIdStr = targetSignalId.get();
if (targetSignalIdStr == null) {
// This is the first one.
@ -468,6 +509,12 @@ public class Wait extends AbstractProcessor {
// Transfer FlowFiles.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
// Penalize signal id if no FlowFile transferred to success.
final PropertyValue waitPenaltyDuration = context.getProperty(WAIT_PENALTY_DURATION);
if (waitPenaltyDuration.isSet() && getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
signalIdPenalties.put(signalId, System.currentTimeMillis() + waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
}
// Update signal if needed.
try {
if (waitCompleted) {
@ -515,4 +562,12 @@ public class Wait extends AbstractProcessor {
return session.putAllAttributes(flowFile, attributesToCopy);
}
@OnStopped
public void onStopped(final ProcessContext context) {
signalIdPenalties.clear();
}
Map<String, Long> getSignalIdPenalties() {
return signalIdPenalties;
}
}

View File

@ -0,0 +1,277 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ValidateCsv</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
table td:first-child {text-align: center;}
</style>
</head>
<body>
<h2>Best practices to handle multiple signal ids at a Wait processor</h2>
When a Wait processor is expected to process multiple signal ids, by configuring 'Release Signal Identifier' with a FlowFile attribute Expression Language, there are few things to consider in order to get the expected result. Processor configuration can vary based on your requirement.
Also, you will need to have high level understanding on how Wait processor works:
<ul>
<li>Wait processor only process a single signal id at a time</li>
<li>How frequent Wait processor runs is defined at 'Run Schedule'</li>
<li>Which FlowFile is processed is determined by Prioritizer</li>
<li>Not limited to Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set</li>
</ul>
See following sections for common patterns
<ul>
<li><a href="#asap">Release any FlowFile as soon as its signal is notified</a></li>
<li><a href="#higher-priority">Release higher priority FlowFiles in each signal id</a></li>
</ul>
<h3 id="asap">Release any FlowFile as soon as its signal is notified</h3>
This is the most common use case.
FlowFiles are independent and can be released in any order.
<h4>Important configurations:</h4>
<ul>
<li>Use FirstInFirstOutPrioritizer (FIFO) at 'wait' relationship (or the incoming connection if 'Wait Mode' is 'Keep in the upstream connection)</li>
</ul>
The following table illustrates the notified signal ids, queued FlowFiles and what will happen at each Wait run cycle.
<table>
<tbody>
<tr>
<th># of Wait run</th>
<th>Notified Signals</th>
<th>Queue Index (FIFO)</th>
<th>FlowFile UUID</th>
<th>Signal ID</th>
<th>&nbsp;</th>
</tr>
<tr>
<td rowspan="3">1</td>
<td rowspan="3">B</td>
<td>1</td>
<td>a</td>
<td>A</td>
<td>This FlowFile is processed. But its signal is not found, and will be re-queued at the end of the queue.</td>
</tr>
<tr>
<td>2</td>
<td>b</td>
<td>B</td>
<td>&nbsp;</td>
</tr>
<tr>
<td>3</td>
<td>c</td>
<td>C</td>
<td>&nbsp;</td>
</tr>
<tr>
<td rowspan="3">2</td>
<td rowspan="3">B</td>
<td>1</td>
<td>b</td>
<td>B</td>
<td>This FlowFile is processed and since its signal is notified, this one will be released to 'success'.</td>
</tr>
<tr>
<td>2</td>
<td>c</td>
<td>C</td>
<td>&nbsp;</td>
</tr>
<tr>
<td>3</td>
<td>a</td>
<td>A</td>
<td>&nbsp;</td>
</tr>
<tr>
<td rowspan="3">3</td>
<td rowspan="3">&nbsp;</td>
<td>1</td>
<td>c</td>
<td>C</td>
<td>This FlowFile will be processed at the next run.</td>
</tr>
<tr>
<td>2</td>
<td>a</td>
<td>A</td>
<td>&nbsp;</td>
</tr>
</tbody>
</table>
<h3 id="higher-priority">Release higher priority FlowFiles in each signal id</h3>
Multiple FlowFiles share the same signal id, and the order of releasing a FlowFile is important.
<h4>Important configurations:</h4>
<ul>
<li>Use a (or set of a) Prioritizer(s) suites your need other than FIFO, at 'wait' relationship (or the incoming connection if 'Wait Mode' is 'Keep in the upstream connection), e.g. PriorityPrioritizer</li>
<li>Specify adequate 'Wait Penalty Duration', e.g. "3 sec", </li>
<li>'Wait Penalty Duration' should be grater than 'Run Schedule', e.g "3 sec" > "1 sec"</li>
<li>Increase 'Run Duration' to avoid the limitation of number of signal ids (see the <a href="#run-duration">note</a> below)</li>
</ul>
The following table illustrates the notified signal ids, queued FlowFiles and what will happen at each Wait run cycle.
The example uses PriorityPrioritizer to control the order of processing FlowFiles within a signal id.
If 'Wait Penalty Duration' is configured, Wait processor tracks unreleased signal ids and their penalty representing when they will be checked again.
<table>
<tbody>
<tr>
<th># of Wait run</th>
<th>Notified Signals</th>
<th>Signal Penalties</th>
<th>Queue Index (via 'priority' attribute)</th>
<th>FlowFile UUID</th>
<th>Signal ID</th>
<th>'priority' attr</th>
<th>&nbsp;</th>
</tr>
<tr>
<td rowspan="3">1 (00:01)</td>
<td rowspan="3">B</td>
<td rowspan="3">&nbsp;</td>
<td>1</td>
<td>a-1</td>
<td>A</td>
<td>1</td>
<td>This FlowFile is processed. But its signal is not found. Penalized.</td>
</tr>
<tr>
<td>2</td>
<td>b-1</td>
<td>B</td>
<td>1</td>
<td>Since a-1 and b-1 have the same priority '1', b-1 may be processed before a-1. You can add another Prioritizer to define more specific ordering.</td>
</tr>
<tr>
<td>3</td>
<td>b-2</td>
<td>B</td>
<td>2</td>
<td>&nbsp;</td>
</tr>
<tr>
<td rowspan="3">2 (00:02)</td>
<td rowspan="3">B</td>
<td rowspan="3">A (00:04)</td>
<td>1</td>
<td>a-1</td>
<td>A</td>
<td>1</td>
<td>This FlowFile is the first one according to the configured Prioritizer, but the signal id is penalized. So, this FlowFile is skipped at this execution.</td>
</tr>
<tr>
<td>2</td>
<td>b-1</td>
<td>B</td>
<td>1</td>
<td>This FlowFile is processed.</td>
</tr>
<tr>
<td>3</td>
<td>b-2</td>
<td>B</td>
<td>2</td>
<td>&nbsp;</td>
</tr>
<tr>
<td rowspan="2">3 (00:03)</td>
<td rowspan="2">&nbsp;</td>
<td rowspan="2">A (00:04)</td>
<td>1</td>
<td>a-1</td>
<td>A</td>
<td>1</td>
<td>This FlowFile is the first one but is still penalized.</td>
</tr>
<tr>
<td>2</td>
<td>b-2</td>
<td>B</td>
<td>2</td>
<td>This FlowFile is processed, but its signal is not notified yet, thus will be penalized.</td>
</tr>
<tr>
<td rowspan="2">4 (00:04)</td>
<td rowspan="2">&nbsp;</td>
<td rowspan="2">B (00:06)</td>
<td>1</td>
<td>a-1</td>
<td>A</td>
<td>1</td>
<td>This FlowFile is no longer penalized, and get processed. But its signal is not notified yet, thus will be penalized again.</td>
</tr>
<tr>
<td>2</td>
<td>b-2</td>
<td>B</td>
<td>2</td>
<td>&nbsp;</td>
</tr>
</tbody>
</table>
<h4 id="run-duration">The importance of 'Run Duration' when 'Wait Penalty Duration' is used</h4>
<p>
There are limitation of number of signals can be checked based on the combination of 'Run Schedule' and 'Wait Penalize Duration'.
If this limitation is engaged, some FlowFiles may not be processed and remain in the 'wait' relationship even if their signal ids are notified.
Let's say Wait is configured with:
</p>
<ul>
<li>Run Schedule = 1 sec</li>
<li>Wait Penalize Duration = 3 sec</li>
<li>Release Signal Identifier = ${uuid}</li>
</ul>
<p>
And there are 5 FlowFiles F1, F2 ... F5 in the 'wait' relationship.
Then the signal for F5 is notified.
Wait will work as follows:
</p>
<ul>
<li>At 00:00 Wait checks the signal for F1, not found, and penalize F1 (till 00:03)</li>
<li>At 00:01 Wait checks the signal for F2, not found, and penalize F2 (till 00:04)</li>
<li>At 00:02 Wait checks the signal for F3, not found, and penalize F3 (till 00:05)</li>
<li>At 00:03 Wait checks the signal for F4, not found, and penalize F4 (till 00:06)</li>
<li>At 00:04 Wait checks the signal for F1 again, because it's not penalized any longer</li>
</ul>
Repeat above cycle, thus F5 will not be released until one of F1 ... F4 is released.
<p>
To mitigate such limitation, increasing 'Run Duration' is recommended. By increasing 'Run Duration', Wait processor can keep being scheduled for that duration. For example, with 'Run Duration' 500 ms, Wait should be able to loop through all 5 queued FlowFiles at a single run.
</p>
</body>
</html>

View File

@ -205,6 +205,35 @@ public class TestWait {
}
}
@Test
public void testWaitPenaltyDuration() throws InitializationException {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.WAIT_PENALTY_DURATION, "1 hour");
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
runner.enqueue(new byte[]{}, props);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
runner.clearTransferState();
// The signal id should be penalized
final Wait processor = (Wait) runner.getProcessor();
final Map<String, Long> signalIdPenalties = processor.getSignalIdPenalties();
assertEquals(1, signalIdPenalties.size());
assertTrue(signalIdPenalties.containsKey("1"));
// FlowFile with the penalized id shouldn't be processed
runner.enqueue(new byte[]{}, props);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 0);
runner.clearTransferState();
}
@Test
public void testReplaceAttributes() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();