mirror of https://github.com/apache/nifi.git
NIFI-3414: Added EnforceOrder processor
Use it with FirstInFirstOutPrioritizer, it can enforce original ordering of 'out-of-order' FlowFiles. nifi-mock is modified to support FlowFile assertion using Prioritizer. Signed-off-by: Matt Burgess <mattyb149@apache.org> NIFI-3414: Added EnforceOrder processor Incorporated review comments, added displayNames. Signed-off-by: Matt Burgess <mattyb149@apache.org> NIFI-3414: Added EnforceOrder processor Incorporate review comments: - Moved nifi-standard-prioritizers dependency to top level nifi/pom.xml. - Changed default initial order from 1 to 0. - Fixed typos. - Use session.get(batchCount). Signed-off-by: Matt Burgess <mattyb149@apache.org> NIFI-3414: Added EnforceOrder processor When a FlowFile is transferred to success, remove attributes previously set when it was transferred to wait or failure. Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #1496
This commit is contained in:
parent
c614a74437
commit
9583ca99c1
|
@ -51,10 +51,14 @@ public class MockFlowFile implements FlowFileRecord {
|
|||
|
||||
private byte[] data = new byte[0];
|
||||
|
||||
private long lastEnqueuedDate = 0;
|
||||
private long enqueuedIndex = 0;
|
||||
|
||||
public MockFlowFile(final long id) {
|
||||
this.creationTime = System.nanoTime();
|
||||
this.id = id;
|
||||
entryDate = System.currentTimeMillis();
|
||||
lastEnqueuedDate = entryDate;
|
||||
attributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()) + ".mockFlowFile");
|
||||
attributes.put(CoreAttributes.PATH.key(), "target");
|
||||
|
||||
|
@ -290,7 +294,11 @@ public class MockFlowFile implements FlowFileRecord {
|
|||
|
||||
@Override
|
||||
public Long getLastQueueDate() {
|
||||
return entryDate;
|
||||
return lastEnqueuedDate;
|
||||
}
|
||||
|
||||
public void setLastEnqueuedDate(long lastEnqueuedDate) {
|
||||
this.lastEnqueuedDate = lastEnqueuedDate;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -315,7 +323,11 @@ public class MockFlowFile implements FlowFileRecord {
|
|||
|
||||
@Override
|
||||
public long getQueueDateIndex() {
|
||||
return 0;
|
||||
return enqueuedIndex;
|
||||
}
|
||||
|
||||
public void setEnqueuedIndex(long enqueuedIndex) {
|
||||
this.enqueuedIndex = enqueuedIndex;
|
||||
}
|
||||
|
||||
public boolean isAttributeEqual(final String attributeName, final String expectedValue) {
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -79,6 +80,8 @@ public class MockProcessSession implements ProcessSession {
|
|||
private boolean rolledback = false;
|
||||
private final Set<Long> removedFlowFiles = new HashSet<>();
|
||||
|
||||
private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
|
||||
|
||||
public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
|
||||
this.processor = processor;
|
||||
this.sharedState = sharedState;
|
||||
|
@ -715,8 +718,18 @@ public class MockProcessSession implements ProcessSession {
|
|||
throw new IllegalArgumentException("I only accept MockFlowFile");
|
||||
}
|
||||
|
||||
final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
|
||||
beingProcessed.remove(flowFile.getId());
|
||||
processorQueue.offer((MockFlowFile) flowFile);
|
||||
processorQueue.offer(mockFlowFile);
|
||||
updateLastQueuedDate(mockFlowFile);
|
||||
|
||||
}
|
||||
|
||||
private void updateLastQueuedDate(MockFlowFile mockFlowFile) {
|
||||
// Simulate StandardProcessSession.updateLastQueuedDate,
|
||||
// which is called when a flow file is transferred to a relationship.
|
||||
mockFlowFile.setLastEnqueuedDate(System.currentTimeMillis());
|
||||
mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -737,14 +750,11 @@ public class MockProcessSession implements ProcessSession {
|
|||
}
|
||||
|
||||
validateState(flowFile);
|
||||
List<MockFlowFile> list = transferMap.get(relationship);
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
transferMap.put(relationship, list);
|
||||
}
|
||||
List<MockFlowFile> list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>());
|
||||
|
||||
beingProcessed.remove(flowFile.getId());
|
||||
list.add((MockFlowFile) flowFile);
|
||||
updateLastQueuedDate((MockFlowFile) flowFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -753,23 +763,8 @@ public class MockProcessSession implements ProcessSession {
|
|||
transfer(flowFiles);
|
||||
return;
|
||||
}
|
||||
if(!processor.getRelationships().contains(relationship)){
|
||||
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
|
||||
}
|
||||
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
validateState(flowFile);
|
||||
}
|
||||
|
||||
List<MockFlowFile> list = transferMap.get(relationship);
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
transferMap.put(relationship, list);
|
||||
}
|
||||
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
beingProcessed.remove(flowFile.getId());
|
||||
list.add((MockFlowFile) flowFile);
|
||||
transfer(flowFile, relationship);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -128,11 +128,6 @@
|
|||
<artifactId>nifi-authorizer</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-prioritizers</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
||||
|
|
|
@ -214,6 +214,11 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-ssl-context-service</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-prioritizers</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
|
|
|
@ -0,0 +1,551 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.Stateful;
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.components.state.StateMap;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
||||
@EventDriven
|
||||
@Tags({"sort", "order"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@TriggerSerially
|
||||
@CapabilityDescription("Enforces expected ordering of FlowFiles those belong to the same data group. " +
|
||||
" Although PriorityAttributePrioritizer can be used on a connection to ensure that flow files going through that connection are in priority order," +
|
||||
" depending on error-handling, branching, and other flow designs, it is possible for FlowFiles to get out-of-order." +
|
||||
" EnforceOrder can be used to enforce original ordering for those FlowFiles." +
|
||||
" [IMPORTANT] In order to take effect of EnforceOrder, FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" +
|
||||
" UNTIL the order of FlowFiles physically get FIXED by operation such as MergeContent or being stored to the final destination.")
|
||||
@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following states per ordering group:" +
|
||||
" '<groupId>.target' is a order number which is being waited to arrive next." +
|
||||
" When a FlowFile with a matching order arrives, or a FlowFile overtakes the FlowFile being waited for because of wait timeout," +
|
||||
" target order will be updated to (FlowFile.order + 1)." +
|
||||
" '<groupId>.max is the maximum order number for a group." +
|
||||
" '<groupId>.updatedAt' is a timestamp when the order of a group was updated last time." +
|
||||
" These managed states will be removed automatically once a group is determined as inactive, see 'Inactive Timeout' for detail.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT,
|
||||
description = "All FlowFiles going through this processor will have this attribute. This value is used to determine wait timeout."),
|
||||
@WritesAttribute(attribute = EnforceOrder.ATTR_RESULT,
|
||||
description = "All FlowFiles going through this processor will have this attribute denoting which relationship it was routed to."),
|
||||
@WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL,
|
||||
description = "FlowFiles routed to 'failure' or 'skipped' relationship will have this attribute describing details."),
|
||||
@WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER,
|
||||
description = "FlowFiles routed to 'wait' or 'skipped' relationship will have this attribute denoting expected order when the FlowFile was processed.")
|
||||
})
|
||||
public class EnforceOrder extends AbstractProcessor {
|
||||
|
||||
public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt";
|
||||
public static final String ATTR_EXPECTED_ORDER = "EnforceOrder.expectedOrder";
|
||||
public static final String ATTR_RESULT = "EnforceOrder.result";
|
||||
public static final String ATTR_DETAIL = "EnforceOrder.detail";
|
||||
private static final Function<String, String> STATE_TARGET_ORDER = groupId -> groupId + ".target";
|
||||
private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt";
|
||||
private static final Function<String, String> STATE_UPDATED_AT = groupId -> groupId + STATE_SUFFIX_UPDATED_AT;
|
||||
private static final Function<String, String> STATE_MAX_ORDER = groupId -> groupId + ".max";
|
||||
|
||||
public static final PropertyDescriptor GROUP_IDENTIFIER = new PropertyDescriptor.Builder()
|
||||
.name("group-id")
|
||||
.displayName("Group Identifier")
|
||||
.description("EnforceOrder is capable of multiple ordering groups." +
|
||||
" 'Group Identifier' is used to determine which group a FlowFile belongs to." +
|
||||
" This property will be evaluated with each incoming FlowFile." +
|
||||
" If evaluated result is empty, the FlowFile will be routed to failure.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("${filename}")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ORDER_ATTRIBUTE = new PropertyDescriptor.Builder()
|
||||
.name("order-attribute")
|
||||
.displayName("Order Attribute")
|
||||
.description("A name of FlowFile attribute whose value will be used to enforce order of FlowFiles within a group." +
|
||||
" If a FlowFile does not have this attribute, or its value is not an integer, the FlowFile will be routed to failure.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INITIAL_ORDER = new PropertyDescriptor.Builder()
|
||||
.name("initial-order")
|
||||
.displayName("Initial Order")
|
||||
.description("When the first FlowFile of a group arrives, initial target order will be computed and stored in the managed state." +
|
||||
" After that, target order will start being tracked by EnforceOrder and stored in the state management store." +
|
||||
" If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," +
|
||||
" and initial order will be left unknown until consecutive FlowFiles provide a valid initial order.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("0")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_ORDER = new PropertyDescriptor.Builder()
|
||||
.name("maximum-order")
|
||||
.displayName("Maximum Order")
|
||||
.description("If specified, any FlowFiles that have larger order will be routed to failure." +
|
||||
" This property is computed only once for a given group." +
|
||||
" After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group." +
|
||||
" If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," +
|
||||
" and maximum order will be left unknown until consecutive FlowFiles provide a valid maximum order.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor WAIT_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("wait-timeout")
|
||||
.displayName("Wait Timeout")
|
||||
.description("Indicates the duration after which waiting FlowFiles will be routed to the 'overtook' relationship.")
|
||||
.required(true)
|
||||
.defaultValue("10 min")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INACTIVE_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("inactive-timeout")
|
||||
.displayName("Inactive Timeout")
|
||||
.description("Indicates the duration after which state for an inactive group will be cleared from managed state." +
|
||||
" Group is determined as inactive if any new incoming FlowFile has not seen for a group for specified duration." +
|
||||
" Inactive Timeout must be longer than Wait Timeout." +
|
||||
" If a FlowFile arrives late after its group is already cleared, it will be treated as a brand new group," +
|
||||
" but will never match the order since expected preceding FlowFiles are already gone." +
|
||||
" The FlowFile will eventually timeout for waiting and routed to 'overtook'." +
|
||||
" To avoid this, group states should be kept long enough, however, shorter duration would be helpful for reusing the same group identifier again.")
|
||||
.required(true)
|
||||
.defaultValue("30 min")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder()
|
||||
.name("batch-count")
|
||||
.displayName("Batch Count")
|
||||
.description("The maximum number of FlowFiles that EnforceOrder can process at an execution.")
|
||||
.required(true)
|
||||
.defaultValue("1000")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(false)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("A FlowFile with a matching order number will be routed to this relationship.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("A FlowFiles which does not have required attributes, or fails to compute those will be routed to this relationship")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_WAIT = new Relationship.Builder()
|
||||
.name("wait")
|
||||
.description("A FlowFile with non matching order will be routed to this relationship")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_OVERTOOK = new Relationship.Builder()
|
||||
.name("overtook")
|
||||
.description("A FlowFile that waited for preceding FlowFiles longer than Wait Timeout and overtook those FlowFiles, will be routed to this relationship.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SKIPPED = new Relationship.Builder()
|
||||
.name("skipped")
|
||||
.description("A FlowFile that has an order younger than current, which means arrived too late and skipped, will be routed to this relationship.")
|
||||
.build();
|
||||
|
||||
private final Set<Relationship> relationships;
|
||||
|
||||
public EnforceOrder() {
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_SUCCESS);
|
||||
rels.add(REL_WAIT);
|
||||
rels.add(REL_OVERTOOK);
|
||||
rels.add(REL_FAILURE);
|
||||
rels.add(REL_SKIPPED);
|
||||
relationships = Collections.unmodifiableSet(rels);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(GROUP_IDENTIFIER);
|
||||
descriptors.add(ORDER_ATTRIBUTE);
|
||||
descriptors.add(INITIAL_ORDER);
|
||||
descriptors.add(MAX_ORDER);
|
||||
descriptors.add(BATCH_COUNT);
|
||||
descriptors.add(WAIT_TIMEOUT);
|
||||
descriptors.add(INACTIVE_TIMEOUT);
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
|
||||
|
||||
final Long waitTimeoutMillis = validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
|
||||
final Long inactiveTimeoutMillis = validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
|
||||
|
||||
if (waitTimeoutMillis >= inactiveTimeoutMillis) {
|
||||
results.add(new ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue())
|
||||
.subject(INACTIVE_TIMEOUT.getDisplayName())
|
||||
.explanation(String.format("%s should be longer than %s",
|
||||
INACTIVE_TIMEOUT.getDisplayName(), WAIT_TIMEOUT.getDisplayName()))
|
||||
.valid(false)
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
|
||||
|
||||
final ComponentLog logger = getLogger();
|
||||
final Integer batchCount = context.getProperty(BATCH_COUNT).asInteger();
|
||||
|
||||
List<FlowFile> flowFiles = session.get(batchCount);
|
||||
if (flowFiles == null || flowFiles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final StateMap stateMap;
|
||||
try {
|
||||
stateMap = context.getStateManager().getState(Scope.LOCAL);
|
||||
} catch (final IOException e) {
|
||||
logger.error("Failed to retrieve state from StateManager due to {}" + e, e);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final OrderingContext oc = new OrderingContext(context, session);
|
||||
|
||||
oc.groupStates.putAll(stateMap.toMap());
|
||||
|
||||
for (FlowFile flowFile : flowFiles) {
|
||||
oc.setFlowFile(flowFile);
|
||||
if (oc.flowFile == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!oc.computeGroupId()
|
||||
|| !oc.computeOrder()
|
||||
|| !oc.computeInitialOrder()
|
||||
|| !oc.computeMaxOrder()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// At this point, the flow file is confirmed to be valid.
|
||||
oc.markFlowFileValid();
|
||||
}
|
||||
|
||||
oc.transferFlowFiles();
|
||||
|
||||
oc.cleanupInactiveStates();
|
||||
|
||||
try {
|
||||
context.getStateManager().setState(oc.groupStates, Scope.LOCAL);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException("Failed to update state due to " + e
|
||||
+ ". Session will be rollback and processor will be yielded for a while.", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class OrderingContext {
|
||||
|
||||
private final ComponentLog logger = getLogger();
|
||||
private final ProcessSession processSession;
|
||||
private final ProcessContext processContext;
|
||||
|
||||
// Following properties are static global setting for all groups.
|
||||
private final String orderAttribute;
|
||||
private final Long waitTimeoutMillis;
|
||||
private final Function<FlowFile, Integer> getOrder;
|
||||
|
||||
private final Map<String, String> groupStates = new HashMap<>();
|
||||
private final long now = System.currentTimeMillis();
|
||||
|
||||
// Following properties are computed per flow file.
|
||||
private final PropertyValue groupIdentifierProperty ;
|
||||
|
||||
// Followings are per group objects.
|
||||
private final PropertyValue initOrderProperty;
|
||||
private final PropertyValue maxOrderProperty;
|
||||
private final Map<String, List<FlowFile>> flowFileGroups = new TreeMap<>();
|
||||
|
||||
// Current variables within incoming FlowFiles loop.
|
||||
private FlowFile flowFile;
|
||||
private String groupId;
|
||||
private Integer order;
|
||||
|
||||
private OrderingContext(final ProcessContext processContext, final ProcessSession processSession) {
|
||||
this.processContext = processContext;
|
||||
this.processSession = processSession;
|
||||
|
||||
orderAttribute = processContext.getProperty(ORDER_ATTRIBUTE).getValue();
|
||||
waitTimeoutMillis = processContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
getOrder = flowFile -> Integer.parseInt(flowFile.getAttribute(orderAttribute));
|
||||
|
||||
|
||||
groupIdentifierProperty = processContext.getProperty(GROUP_IDENTIFIER);
|
||||
|
||||
initOrderProperty = processContext.getProperty(INITIAL_ORDER);
|
||||
maxOrderProperty = processContext.getProperty(MAX_ORDER);
|
||||
}
|
||||
|
||||
private void setFlowFile(final FlowFile flowFile) {
|
||||
this.flowFile = flowFile;
|
||||
this.groupId = null;
|
||||
this.order = null;
|
||||
}
|
||||
|
||||
private boolean computeGroupId() {
|
||||
groupId = groupIdentifierProperty.evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (isBlank(groupId)) {
|
||||
transferToFailure(flowFile, "Failed to get Group Identifier.");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean computeOrder() {
|
||||
try {
|
||||
order = getOrder.apply(flowFile);
|
||||
} catch (final NumberFormatException e) {
|
||||
transferToFailure(flowFile, "Failed to parse order attribute due to " + e, e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean computeMaxOrder() {
|
||||
if (maxOrderProperty.isSet()) {
|
||||
// Compute maxOrder for this group if it's not there yet.
|
||||
final String maxOrderStr = groupStates.computeIfAbsent(STATE_MAX_ORDER.apply(groupId),
|
||||
k -> maxOrderProperty.evaluateAttributeExpressions(flowFile).getValue());
|
||||
if (isBlank(maxOrderStr)) {
|
||||
transferToFailure(flowFile, String.format("%s was specified but result was empty.", MAX_ORDER.getDisplayName()));
|
||||
return false;
|
||||
}
|
||||
|
||||
final Integer maxOrder;
|
||||
try {
|
||||
maxOrder = Integer.parseInt(maxOrderStr);
|
||||
} catch (final NumberFormatException e) {
|
||||
final String msg = String.format("Failed to get Maximum Order for group [%s] due to %s", groupId, e);
|
||||
transferToFailure(flowFile, msg, e);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check max order.
|
||||
if (order > maxOrder) {
|
||||
final String msg = String.format("Order (%d) is greater than the Maximum Order (%d) for Group [%s]", order, maxOrder, groupId);
|
||||
transferToFailure(flowFile, msg);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean computeInitialOrder() {
|
||||
// Compute initial order. Use asInteger() to check if it's a valid integer.
|
||||
final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId);
|
||||
try {
|
||||
final AtomicReference<String> computedInitOrder = new AtomicReference<>();
|
||||
groupStates.computeIfAbsent(stateKeyOrder, k -> {
|
||||
final String initOrderStr = initOrderProperty.evaluateAttributeExpressions(flowFile).getValue();
|
||||
// Parse it to check if it is a valid integer.
|
||||
Integer.parseInt(initOrderStr);
|
||||
computedInitOrder.set(initOrderStr);
|
||||
return initOrderStr;
|
||||
});
|
||||
// If these map modification is in the computeIfAbsent function, it causes this issue.
|
||||
// JDK-8071667 : HashMap.computeIfAbsent() adds entry that HashMap.get() does not find.
|
||||
// http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8071667
|
||||
if (!isBlank(computedInitOrder.get())) {
|
||||
groupStates.put(STATE_UPDATED_AT.apply(groupId), String.valueOf(now));
|
||||
}
|
||||
|
||||
} catch (final NumberFormatException e) {
|
||||
final String msg = String.format("Failed to get Initial Order for Group [%s] due to %s", groupId, e);
|
||||
transferToFailure(flowFile, msg, e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void markFlowFileValid() {
|
||||
final List<FlowFile> groupedFlowFiles = flowFileGroups.computeIfAbsent(groupId, k -> new ArrayList<>());
|
||||
|
||||
final FlowFile validFlowFile;
|
||||
if (isBlank(flowFile.getAttribute(ATTR_STARTED_AT))) {
|
||||
validFlowFile = processSession.putAttribute(flowFile, ATTR_STARTED_AT, String.valueOf(now));
|
||||
} else {
|
||||
validFlowFile = flowFile;
|
||||
}
|
||||
|
||||
groupedFlowFiles.add(validFlowFile);
|
||||
}
|
||||
|
||||
private void transferFlowFiles() {
|
||||
flowFileGroups.entrySet().stream().filter(entry -> !entry.getValue().isEmpty()).map(entry -> {
|
||||
// Sort flow files within each group.
|
||||
final List<FlowFile> groupedFlowFiles = entry.getValue();
|
||||
groupedFlowFiles.sort(Comparator.comparing(getOrder));
|
||||
return entry;
|
||||
}).forEach(entry -> {
|
||||
// Check current state.
|
||||
final String groupId = entry.getKey();
|
||||
final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId);
|
||||
final int previousTargetOrder = Integer.parseInt(groupStates.get(stateKeyOrder));
|
||||
final AtomicInteger targetOrder = new AtomicInteger(previousTargetOrder);
|
||||
final List<FlowFile> groupedFlowFiles = entry.getValue();
|
||||
final String maxOrderStr = groupStates.get(STATE_MAX_ORDER.apply(groupId));
|
||||
|
||||
groupedFlowFiles.forEach(f -> {
|
||||
final Integer order = getOrder.apply(f);
|
||||
final boolean isMaxOrder = !isBlank(maxOrderStr) && order.equals(Integer.parseInt(maxOrderStr));
|
||||
|
||||
if (order == targetOrder.get()) {
|
||||
transferResult(f, REL_SUCCESS, null, null);
|
||||
if (!isMaxOrder) {
|
||||
// If max order is specified and this FlowFile has the max order, don't increment target anymore.
|
||||
targetOrder.incrementAndGet();
|
||||
}
|
||||
|
||||
} else if (order > targetOrder.get()) {
|
||||
|
||||
if (now - Long.parseLong(f.getAttribute(ATTR_STARTED_AT)) > waitTimeoutMillis) {
|
||||
transferResult(f, REL_OVERTOOK, null, targetOrder.get());
|
||||
targetOrder.set(isMaxOrder ? order : order + 1);
|
||||
} else {
|
||||
transferResult(f, REL_WAIT, null, targetOrder.get());
|
||||
}
|
||||
|
||||
} else {
|
||||
final String msg = String.format("Skipped, FlowFile order was %d but current target is %d", order, targetOrder.get());
|
||||
logger.warn(msg + ". {}", new Object[]{f});
|
||||
transferResult(f, REL_SKIPPED, msg, targetOrder.get());
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
if (previousTargetOrder != targetOrder.get()) {
|
||||
groupStates.put(stateKeyOrder, String.valueOf(targetOrder.get()));
|
||||
groupStates.put(STATE_UPDATED_AT.apply(groupId), String.valueOf(now));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void transferResult(final FlowFile flowFile, final Relationship result, final String detail, final Integer expectedOrder) {
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(ATTR_RESULT, result.getName());
|
||||
if (expectedOrder != null) {
|
||||
attributes.put(ATTR_EXPECTED_ORDER, expectedOrder.toString());
|
||||
}
|
||||
if (!isBlank(detail)) {
|
||||
attributes.put(ATTR_DETAIL, detail);
|
||||
}
|
||||
|
||||
FlowFile resultFlowFile = processSession.putAllAttributes(flowFile, attributes);
|
||||
// Remove
|
||||
if (expectedOrder == null) {
|
||||
resultFlowFile = processSession.removeAttribute(resultFlowFile, ATTR_EXPECTED_ORDER);
|
||||
}
|
||||
if (detail == null) {
|
||||
resultFlowFile = processSession.removeAttribute(resultFlowFile, ATTR_DETAIL);
|
||||
}
|
||||
processSession.transfer(resultFlowFile, result);
|
||||
}
|
||||
|
||||
private void transferToFailure(final FlowFile flowFile, final String message) {
|
||||
transferToFailure(flowFile, message, null);
|
||||
}
|
||||
|
||||
private void transferToFailure(final FlowFile flowFile, final String message, final Throwable cause) {
|
||||
if (cause != null) {
|
||||
getLogger().warn(message + " {}", new Object[]{flowFile}, cause);
|
||||
} else {
|
||||
getLogger().warn(message + " {}", new Object[]{flowFile});
|
||||
}
|
||||
transferResult(flowFile, REL_FAILURE, message, null);
|
||||
}
|
||||
|
||||
private void cleanupInactiveStates() {
|
||||
final Long inactiveTimeout = processContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
final List<String> inactiveGroups = groupStates.keySet().stream()
|
||||
.filter(k -> k.endsWith(STATE_SUFFIX_UPDATED_AT) && (now - Long.parseLong(groupStates.get(k)) > inactiveTimeout))
|
||||
.map(k -> k.substring(0, k.length() - STATE_SUFFIX_UPDATED_AT.length()))
|
||||
.collect(Collectors.toList());
|
||||
inactiveGroups.forEach(groupId -> {
|
||||
groupStates.remove(STATE_TARGET_ORDER.apply(groupId));
|
||||
groupStates.remove(STATE_UPDATED_AT.apply(groupId));
|
||||
groupStates.remove(STATE_MAX_ORDER.apply(groupId));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -23,6 +23,7 @@ org.apache.nifi.processors.standard.DetectDuplicate
|
|||
org.apache.nifi.processors.standard.DistributeLoad
|
||||
org.apache.nifi.processors.standard.DuplicateFlowFile
|
||||
org.apache.nifi.processors.standard.EncryptContent
|
||||
org.apache.nifi.processors.standard.EnforceOrder
|
||||
org.apache.nifi.processors.standard.EvaluateJsonPath
|
||||
org.apache.nifi.processors.standard.EvaluateXPath
|
||||
org.apache.nifi.processors.standard.EvaluateXQuery
|
||||
|
|
|
@ -0,0 +1,493 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestEnforceOrder {
|
||||
|
||||
@Test
|
||||
public void testDefaultPropertyValidation() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
// Default values should not be valid.
|
||||
runner.assertNotValid();
|
||||
|
||||
// Set required properties.
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomPropertyValidation() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
// Set required properties.
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.assertValid();
|
||||
|
||||
// Inactive Timeout should be longer than Wait Timeout
|
||||
runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "30 sec");
|
||||
runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "29 sec");
|
||||
runner.assertNotValid();
|
||||
|
||||
// Inactive Timeout should be longer than Wait Timeout
|
||||
runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "30 sec");
|
||||
runner.assertNotValid();
|
||||
|
||||
// Inactive Timeout should be longer than Wait Timeout
|
||||
runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "31 sec");
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
|
||||
private static class Ordered {
|
||||
private final Map<String, String> map = new HashMap<>();
|
||||
private Ordered(final int index) {
|
||||
map.put("index", String.valueOf(index));
|
||||
}
|
||||
|
||||
private static Ordered i(final int index) {
|
||||
return new Ordered(index);
|
||||
}
|
||||
|
||||
private static Ordered i(final String group, final int index) {
|
||||
return new Ordered(index).put("group", group);
|
||||
}
|
||||
|
||||
private Ordered put(final String key, final String value) {
|
||||
map.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
private Map<String, String> map() {
|
||||
return map;
|
||||
}
|
||||
|
||||
private static MockFlowFile enqueue(final TestRunner runner, final String group, final int index) {
|
||||
return runner.enqueue(group + "." + index, i(group, index).map());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSort() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "b", 0);
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
Ordered.enqueue(runner, "a", 0);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 3);
|
||||
|
||||
final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
succeeded.sort(new FirstInFirstOutPrioritizer());
|
||||
succeeded.get(0).assertContentEquals("a.0");
|
||||
succeeded.get(1).assertContentEquals("a.1");
|
||||
succeeded.get(2).assertContentEquals("b.0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDuplicatedOrder() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
Ordered.enqueue(runner, "a", 3);
|
||||
|
||||
runner.run();
|
||||
|
||||
final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
assertEquals(4, succeeded.size());
|
||||
succeeded.sort(new FirstInFirstOutPrioritizer());
|
||||
succeeded.get(0).assertContentEquals("a.1");
|
||||
succeeded.get(1).assertContentEquals("a.2");
|
||||
succeeded.get(2).assertContentEquals("a.3");
|
||||
succeeded.get(3).assertContentEquals("b.1");
|
||||
|
||||
// It's not possible to distinguish skipped and duplicated, since we only tracks target order number.
|
||||
final List<MockFlowFile> skipped = runner.getFlowFilesForRelationship(EnforceOrder.REL_SKIPPED);
|
||||
assertEquals(1, skipped.size());
|
||||
skipped.get(0).assertContentEquals("a.2");
|
||||
skipped.get(0).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "3");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoGroupIdentifier() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
runner.enqueue("no group id", Ordered.i(1).map()); // without group attribute
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
|
||||
runner.run();
|
||||
|
||||
final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
assertEquals(3, succeeded.size());
|
||||
succeeded.sort(new FirstInFirstOutPrioritizer());
|
||||
succeeded.get(0).assertContentEquals("a.1");
|
||||
succeeded.get(1).assertContentEquals("a.2");
|
||||
succeeded.get(2).assertContentEquals("b.1");
|
||||
|
||||
final List<MockFlowFile> failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
|
||||
assertEquals(1, failed.size());
|
||||
failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalOrderValue() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
runner.enqueue("illegal order", Ordered.i("a", 1).put("index", "non-integer").map());
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
|
||||
runner.run();
|
||||
|
||||
final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
assertEquals(3, succeeded.size());
|
||||
succeeded.sort(new FirstInFirstOutPrioritizer());
|
||||
succeeded.get(0).assertContentEquals("a.1");
|
||||
succeeded.get(1).assertContentEquals("a.2");
|
||||
succeeded.get(2).assertContentEquals("b.1");
|
||||
|
||||
final List<MockFlowFile> failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
|
||||
assertEquals(1, failed.size());
|
||||
failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
|
||||
failed.get(0).assertContentEquals("illegal order");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitialOrderValue() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "${index.start}");
|
||||
runner.setProperty(EnforceOrder.MAX_ORDER, "${index.max}");
|
||||
runner.assertValid();
|
||||
runner.enqueue("b.0", Ordered.i("b", 0).put("index.start", "0").put("index.max", "99").map());
|
||||
runner.enqueue("a.100", Ordered.i("a", 100).put("index.start", "100").put("index.max", "103").map());
|
||||
runner.enqueue("a.101", Ordered.i("a", 101).put("index.start", "100").put("index.max", "103").map());
|
||||
runner.enqueue("illegal initial order", Ordered.i("c", 1).put("index.start", "non-integer").map());
|
||||
runner.enqueue("without initial order", Ordered.i("d", 1).map());
|
||||
// Even if this flow file doesn't have initial order attribute, this will be routed to success.
|
||||
// Because target order for group b is already computed from b.0.
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
|
||||
runner.run();
|
||||
|
||||
List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
assertEquals(4, succeeded.size());
|
||||
succeeded.sort(new FirstInFirstOutPrioritizer());
|
||||
succeeded.get(0).assertContentEquals("a.100");
|
||||
succeeded.get(1).assertContentEquals("a.101");
|
||||
succeeded.get(2).assertContentEquals("b.0");
|
||||
succeeded.get(3).assertContentEquals("b.1");
|
||||
|
||||
final List<MockFlowFile> failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
|
||||
assertEquals(2, failed.size());
|
||||
failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
|
||||
failed.get(0).assertContentEquals("illegal initial order");
|
||||
failed.get(1).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
|
||||
failed.get(1).assertContentEquals("without initial order");
|
||||
|
||||
final MockStateManager stateManager = runner.getStateManager();
|
||||
stateManager.assertStateEquals("a.target", "102", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("a.max", "103", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("b.target", "2", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("b.max", "99", Scope.LOCAL);
|
||||
|
||||
runner.clearTransferState();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxOrder() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${fragment.identifier}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.setProperty(EnforceOrder.MAX_ORDER, "${fragment.count}");
|
||||
runner.assertValid();
|
||||
runner.enqueue("b.1", Ordered.i(1).put("fragment.identifier", "b").put("fragment.count", "3").map());
|
||||
runner.enqueue("a.2", Ordered.i(2).put("fragment.identifier", "a").put("fragment.count", "2").map());
|
||||
runner.enqueue("without max order", Ordered.i(1).put("fragment.identifier", "c").map());
|
||||
runner.enqueue("illegal max order", Ordered.i(1).put("fragment.identifier", "d").put("fragment.count", "X").map());
|
||||
runner.enqueue("a.1", Ordered.i(1).put("fragment.identifier", "a").put("fragment.count", "2").map());
|
||||
runner.enqueue("a.3", Ordered.i(3).put("fragment.identifier", "a").put("fragment.count", "2").map()); // Exceed max
|
||||
|
||||
runner.run();
|
||||
|
||||
final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
succeeded.sort(new FirstInFirstOutPrioritizer());
|
||||
assertEquals(3, succeeded.size());
|
||||
succeeded.get(0).assertContentEquals("a.1");
|
||||
succeeded.get(1).assertContentEquals("a.2");
|
||||
succeeded.get(2).assertContentEquals("b.1");
|
||||
|
||||
final List<MockFlowFile> failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
|
||||
assertEquals(3, failed.size());
|
||||
failed.get(0).assertContentEquals("without max order");
|
||||
failed.get(1).assertContentEquals("illegal max order");
|
||||
failed.get(2).assertContentEquals("a.3"); // exceeds max order
|
||||
|
||||
final MockStateManager stateManager = runner.getStateManager();
|
||||
stateManager.assertStateEquals("a.target", "2", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("a.max", "2", Scope.LOCAL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWaitOvertakeSkip() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.setProperty(EnforceOrder.MAX_ORDER, "10");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
Ordered.enqueue(runner, "a", 5); // waits for a.3 and a.4
|
||||
Ordered.enqueue(runner, "b", 3); // waits for b.2
|
||||
Ordered.enqueue(runner, "c", 9); // waits for c.1 to 8
|
||||
Ordered.enqueue(runner, "d", 10); // waits for d.1 to 9
|
||||
|
||||
runner.run();
|
||||
|
||||
List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
assertEquals(3, succeeded.size());
|
||||
final FirstInFirstOutPrioritizer fifo = new FirstInFirstOutPrioritizer();
|
||||
succeeded.sort(fifo);
|
||||
succeeded.get(0).assertContentEquals("a.1");
|
||||
succeeded.get(1).assertContentEquals("a.2");
|
||||
succeeded.get(2).assertContentEquals("b.1");
|
||||
|
||||
List<MockFlowFile> waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT);
|
||||
assertEquals(4, waiting.size());
|
||||
waiting.get(0).assertContentEquals("a.5");
|
||||
waiting.get(1).assertContentEquals("b.3");
|
||||
waiting.get(2).assertContentEquals("c.9");
|
||||
waiting.get(3).assertContentEquals("d.10");
|
||||
waiting.get(0).assertAttributeExists("EnforceOrder.startedAt");
|
||||
waiting.get(1).assertAttributeExists("EnforceOrder.startedAt");
|
||||
waiting.get(2).assertAttributeExists("EnforceOrder.startedAt");
|
||||
waiting.get(3).assertAttributeExists("EnforceOrder.startedAt");
|
||||
|
||||
final MockStateManager stateManager = runner.getStateManager();
|
||||
stateManager.assertStateEquals("a.target", "3", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("b.target", "2", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("c.target", "1", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("d.target", "1", Scope.LOCAL);
|
||||
stateManager.assertStateSet("a.updatedAt", Scope.LOCAL);
|
||||
stateManager.assertStateSet("b.updatedAt", Scope.LOCAL);
|
||||
stateManager.assertStateSet("c.updatedAt", Scope.LOCAL);
|
||||
stateManager.assertStateSet("d.updatedAt", Scope.LOCAL);
|
||||
|
||||
// Run it again with waiting files.
|
||||
runner.clearTransferState();
|
||||
waiting.forEach(runner::enqueue);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(EnforceOrder.REL_WAIT, 4);
|
||||
waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT);
|
||||
|
||||
// Run it again with shorter wait timeout to make overtaking happen.
|
||||
runner.clearTransferState();
|
||||
runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "10 ms");
|
||||
Thread.sleep(20);
|
||||
waiting.forEach(runner::enqueue);
|
||||
Ordered.enqueue(runner, "b", 2); // arrived in time
|
||||
Ordered.enqueue(runner, "a", 6); // a.4 and a.5 have not arrived yet
|
||||
runner.run();
|
||||
|
||||
succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
|
||||
succeeded.sort(fifo);
|
||||
assertEquals(3, succeeded.size());
|
||||
succeeded.get(0).assertContentEquals("a.6"); // This is ok because a.5 was there.
|
||||
succeeded.get(1).assertContentEquals("b.2");
|
||||
succeeded.get(2).assertContentEquals("b.3");
|
||||
|
||||
List<MockFlowFile> overtook = runner.getFlowFilesForRelationship(EnforceOrder.REL_OVERTOOK);
|
||||
assertEquals(3, overtook.size());
|
||||
overtook.get(0).assertContentEquals("a.5"); // overtook a.3.
|
||||
overtook.get(0).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "3");
|
||||
overtook.get(1).assertContentEquals("c.9"); // overtook c.1 - 8.
|
||||
overtook.get(1).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1");
|
||||
overtook.get(2).assertContentEquals("d.10"); // overtook d.1 - 9.
|
||||
overtook.get(2).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1");
|
||||
|
||||
stateManager.assertStateEquals("a.target", "7", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("b.target", "4", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("c.target", "10", Scope.LOCAL); // it was c.9, so +1
|
||||
stateManager.assertStateEquals("d.target", "10", Scope.LOCAL); // it was d.10 (max) so don't +1
|
||||
|
||||
// Simulate a.3 and a.4 arrive but too late..
|
||||
runner.clearTransferState();
|
||||
Ordered.enqueue(runner, "a", 3);
|
||||
Ordered.enqueue(runner, "a", 4);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SKIPPED, 2);
|
||||
final List<MockFlowFile> skipped = runner.getFlowFilesForRelationship(EnforceOrder.REL_SKIPPED);
|
||||
skipped.get(0).assertContentEquals("a.3");
|
||||
skipped.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
|
||||
skipped.get(1).assertContentEquals("a.4");
|
||||
skipped.get(1).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanInactiveGroups() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
Ordered.enqueue(runner, "c", 1);
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 4);
|
||||
|
||||
// Run it again with shorter inactive timeout
|
||||
runner.clearTransferState();
|
||||
runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "5 ms");
|
||||
runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "10 ms");
|
||||
|
||||
Thread.sleep(15);
|
||||
|
||||
// No group b.
|
||||
Ordered.enqueue(runner, "a", 3);
|
||||
Ordered.enqueue(runner, "c", 2);
|
||||
|
||||
runner.run();
|
||||
|
||||
// Group b was determined as inactive, thus its states should be removed.
|
||||
final MockStateManager stateManager = runner.getStateManager();
|
||||
stateManager.assertStateEquals("a.target", "4", Scope.LOCAL);
|
||||
stateManager.assertStateNotSet("b.target", Scope.LOCAL);
|
||||
stateManager.assertStateEquals("c.target", "3", Scope.LOCAL);
|
||||
stateManager.assertStateSet("a.updatedAt", Scope.LOCAL);
|
||||
stateManager.assertStateNotSet("b.updatedAt", Scope.LOCAL);
|
||||
stateManager.assertStateSet("c.updatedAt", Scope.LOCAL);
|
||||
|
||||
// If b comes again, it'll be treated as brand new group.
|
||||
runner.clearTransferState();
|
||||
Ordered.enqueue(runner, "b", 2);
|
||||
|
||||
runner.run();
|
||||
stateManager.assertStateEquals("b.target", "1", Scope.LOCAL);
|
||||
stateManager.assertStateSet("b.updatedAt", Scope.LOCAL);
|
||||
|
||||
// b.2 should be routed to wait, since there's no b.1. It will eventually overtake.
|
||||
runner.assertAllFlowFilesTransferred(EnforceOrder.REL_WAIT, 1);
|
||||
final List<MockFlowFile> waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT);
|
||||
waiting.get(0).assertContentEquals("b.2");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearOldProperties() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class);
|
||||
|
||||
runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
|
||||
runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
|
||||
runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
|
||||
runner.assertValid();
|
||||
Ordered.enqueue(runner, "a", 2);
|
||||
Ordered.enqueue(runner, "b", 1);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(EnforceOrder.REL_WAIT, 1);
|
||||
MockFlowFile a2 = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT).get(0);
|
||||
a2.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "wait");
|
||||
a2.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
|
||||
a2.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
|
||||
a2.assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1");
|
||||
a2.assertContentEquals("a.2");
|
||||
|
||||
runner.assertTransferCount(EnforceOrder.REL_SUCCESS, 1);
|
||||
MockFlowFile b1 = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(0);
|
||||
b1.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success");
|
||||
b1.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
|
||||
b1.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
|
||||
b1.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER);
|
||||
b1.assertContentEquals("b.1");
|
||||
|
||||
runner.clearTransferState();
|
||||
|
||||
Ordered.enqueue(runner, "a", 1);
|
||||
runner.enqueue(a2);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 2);
|
||||
MockFlowFile a1 = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(0);
|
||||
a1.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success");
|
||||
a1.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
|
||||
a1.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
|
||||
a1.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER);
|
||||
a1.assertContentEquals("a.1");
|
||||
|
||||
a2 = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(1);
|
||||
a2.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success");
|
||||
a2.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
|
||||
a2.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
|
||||
a2.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER); // Should be cleared.
|
||||
a2.assertContentEquals("a.2");
|
||||
}
|
||||
}
|
5
pom.xml
5
pom.xml
|
@ -970,6 +970,11 @@ language governing permissions and limitations under the License. -->
|
|||
<version>1.2.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-prioritizers</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-jetty-bundle</artifactId>
|
||||
|
|
Loading…
Reference in New Issue