NIFI-9475 Provide Framework-Level Retries for NiFi Relationships (#5593)

This commit is contained in:
timeabarna 2022-02-15 21:40:41 +01:00 committed by GitHub
parent 84b2484fd4
commit 63a543cd67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1642 additions and 16 deletions

View File

@ -44,6 +44,11 @@ public class VersionedProcessor extends VersionedComponent
private Set<String> autoTerminatedRelationships;
private ScheduledState scheduledState;
private Integer retryCount;
private Set<String> retriedRelationships;
private String backoffMechanism;
private String maxBackoffPeriod;
@ApiModelProperty("The frequency with which to schedule the processor. The format of the value will depend on th value of schedulingStrategy.")
public String getSchedulingPeriod() {
return schedulingPeriod;
@ -204,4 +209,48 @@ public class VersionedProcessor extends VersionedComponent
return ComponentType.PROCESSOR;
}
@ApiModelProperty(
value = "Overall number of retries."
)
public Integer getRetryCount() {
return retryCount;
}
public void setRetryCount(Integer retryCount) {
this.retryCount = retryCount;
}
@ApiModelProperty(
value = "All the relationships should be retried."
)
public Set<String> getRetriedRelationships() {
return retriedRelationships;
}
public void setRetriedRelationships(Set<String> retriedRelationships) {
this.retriedRelationships = retriedRelationships;
}
@ApiModelProperty(
value = "Determines whether the FlowFile should be penalized or the processor should be yielded between retries.",
allowableValues = "PENALIZE_FLOWFILE, YIELD_PROCESSOR"
)
public String getBackoffMechanism() {
return backoffMechanism;
}
public void setBackoffMechanism(String backoffMechanism) {
this.backoffMechanism = backoffMechanism;
}
@ApiModelProperty(
value = "Maximum amount of time to be waited during a retry period."
)
public String getMaxBackoffPeriod() {
return maxBackoffPeriod;
}
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
this.maxBackoffPeriod = maxBackoffPeriod;
}
}

View File

@ -163,4 +163,14 @@ public interface ProcessContext extends PropertyContext, ClusterContext {
* @return the configured name of this processor
*/
String getName();
/**
* @return the configured relationships to be retried of this processor
*/
boolean isRelationshipRetried(Relationship relationship);
/**
* @return the actual value of the retry counter of this processor
*/
int getRetryCount();
}

View File

@ -579,4 +579,14 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
public boolean isConnectedToCluster() {
return isConnected;
}
@Override
public int getRetryCount() {
return 0;
}
@Override
public boolean isRelationshipRetried(Relationship relationship) {
return false;
}
}

View File

@ -51,6 +51,12 @@ public class ProcessorConfigDTO {
private Map<String, String> defaultConcurrentTasks;
private Map<String, String> defaultSchedulingPeriod;
//retry configurations
private Integer retryCount;
private Set<String> retriedRelationships;
private String backoffMechanism;
private String maxBackoffPeriod;
public ProcessorConfigDTO() {
}
@ -308,4 +314,48 @@ public class ProcessorConfigDTO {
this.defaultSchedulingPeriod = defaultSchedulingPeriod;
}
@ApiModelProperty(
value = "Overall number of retries."
)
public Integer getRetryCount() {
return retryCount;
}
public void setRetryCount(Integer retryCount) {
this.retryCount = retryCount;
}
@ApiModelProperty(
value = "All the relationships should be retried."
)
public Set<String> getRetriedRelationships() {
return retriedRelationships;
}
public void setRetriedRelationships(Set<String> retriedRelationships) {
this.retriedRelationships = retriedRelationships;
}
@ApiModelProperty(accessMode = ApiModelProperty.AccessMode.READ_ONLY,
value = "Determines whether the FlowFile should be penalized or the processor should be yielded between retries.",
allowableValues = "PENALIZE_FLOWFILE, YIELD_PROCESSOR"
)
public String getBackoffMechanism() {
return backoffMechanism;
}
public void setBackoffMechanism(String backoffMechanism) {
this.backoffMechanism = backoffMechanism;
}
@ApiModelProperty(
value = "Maximum amount of time to be waited during a retry period."
)
public String getMaxBackoffPeriod() {
return maxBackoffPeriod;
}
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
this.maxBackoffPeriod = maxBackoffPeriod;
}
}

View File

@ -29,6 +29,7 @@ public class RelationshipDTO {
private String name;
private String description;
private Boolean autoTerminate;
private Boolean retry;
/**
* @return the relationship name
@ -71,4 +72,18 @@ public class RelationshipDTO {
public void setAutoTerminate(Boolean autoTerminate) {
this.autoTerminate = autoTerminate;
}
/**
* @return true if relationship is retry;false otherwise
*/
@ApiModelProperty(
value = "Whether or not flowfiles sent to this relationship should retry."
)
public Boolean isRetry() {
return retry;
}
public void setRetry(Boolean retry) {
this.retry = retry;
}
}

View File

@ -127,6 +127,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final String DEFAULT_YIELD_PERIOD = "1 sec";
public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
private static final String DEFAULT_MAX_BACKOFF_PERIOD = "10 mins";
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicReference<ProcessorDetails> processorRef;
private final AtomicReference<String> identifier;
@ -157,6 +158,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final int hashCode;
private volatile boolean hasActiveThreads = false;
private volatile int retryCount;
private volatile Set<String> retriedRelationships;
private volatile BackoffMechanism backoffMechanism;
private volatile String maxBackoffPeriod;
public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider, final ComponentVariableRegistry variableRegistry,
@ -202,6 +208,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
executionNode = isExecutionNodeRestricted() ? ExecutionNode.PRIMARY : ExecutionNode.ALL;
this.hashCode = new HashCodeBuilder(7, 67).append(identifier).toHashCode();
retryCount = 10;
retriedRelationships = new HashSet<>();
backoffMechanism = BackoffMechanism.PENALIZE_FLOWFILE;
maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD;
try {
if (processorDetails.getProcClass().isAnnotationPresent(DefaultSchedule.class)) {
DefaultSchedule dsc = processorDetails.getProcClass().getAnnotation(DefaultSchedule.class);
@ -1863,6 +1874,73 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
return desiredState;
}
@Override
public int getRetryCount() {
return retryCount;
}
@Override
public void setRetryCount(Integer retryCount) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.retryCount = (retryCount == null) ? 0 : retryCount;
}
@Override
public Set<String> getRetriedRelationships() {
return retriedRelationships;
}
@Override
public void setRetriedRelationships(Set<String> retriedRelationships) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships);
}
@Override
public boolean isRelationshipRetried(final Relationship relationship) {
if (relationship == null) {
return false;
} else {
return this.retriedRelationships.contains(relationship.getName());
}
}
@Override
public BackoffMechanism getBackoffMechanism() {
return backoffMechanism;
}
@Override
public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism;
}
@Override
public String getMaxBackoffPeriod() {
return maxBackoffPeriod;
}
@Override
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
if (maxBackoffPeriod == null) {
maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD;
}
final long backoffNanos = FormatUtils.getTimeDuration(maxBackoffPeriod, TimeUnit.NANOSECONDS);
if (backoffNanos < 0) {
throw new IllegalArgumentException("Max Backoff Period must be positive");
}
this.maxBackoffPeriod = maxBackoffPeriod;
}
private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> monitoringFuture, final long completionTimestamp) {
if (taskFuture.isDone()) {
monitoringFuture.cancel(false); // stop scheduling this task

View File

@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.queue.FlowFileQueue;
@ -63,6 +64,7 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.NonFlushableOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -140,7 +142,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private Map<String, Long> immediateCounters;
private final Set<String> removedFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>(); // UUID of any FlowFile that was created in this session
private final Set<String> createdFlowFilesWithoutLineage = new HashSet<>(); // UUID of any FlowFile that was created without a parent. This is a subset of createdFlowFiles.
private final InternalProvenanceReporter provenanceReporter;
@ -174,6 +177,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private StateMap localState;
private StateMap clusterState;
private final String retryAttribute;
private final FlowFileLinkage flowFileLinkage = new FlowFileLinkage();
public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) {
this.context = context;
@ -185,6 +190,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
this.claimCache = context.createContentClaimWriteCache();
LOG.trace("Session {} created for {}", this, connectableDescription);
processingStartTime = System.nanoTime();
retryAttribute = "retryCount." + context.getConnectable().getIdentifier();
}
private void verifyTaskActive() {
@ -289,7 +295,28 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>();
final Connectable connectable = context.getConnectable();
final long maxBackoffMillis = Math.round(FormatUtils.getPreciseTimeDuration(connectable.getMaxBackoffPeriod(), TimeUnit.MILLISECONDS));
// Determine which FlowFiles need to be retried
final Set<Long> retryIds = new HashSet<>();
for (final StandardRepositoryRecord record : records.values()) {
if (isRetry(record)) {
final long flowFileId = record.getCurrent().getId();
retryIds.add(flowFileId);
final Collection<Long> linkedIds = flowFileLinkage.getLinkedIds(flowFileId);
retryIds.addAll(linkedIds);
}
}
for (final StandardRepositoryRecord record : records.values()) {
// Check if this Record should be retried. If so, perform the necessary actions to retry the Record and then continue on to the next record.
if (retryIds.contains(record.getCurrent().getId())) {
retry(record, maxBackoffMillis);
}
if (record.isMarkedForDelete()) {
continue;
}
@ -318,13 +345,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
} else {
final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
final FlowFileRecord currRec = record.getCurrent();
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
builder.removeAttributes(retryAttribute);
record.setDestination(finalDestination.getFlowFileQueue());
record.setWorking(builder.build(), false);
incrementConnectionInputCounts(finalDestination, record);
for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
incrementConnectionInputCounts(destination, record);
final FlowFileRecord currRec = record.getCurrent();
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
builder.id(context.getNextFlowFileSequence());
final String newUuid = UUID.randomUUID().toString();
@ -338,7 +368,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
if (claim != null) {
context.getContentRepository().incrementClaimaintCount(claim);
}
newRecord.setWorking(clone, Collections.<String, String> emptyMap(), false);
newRecord.setWorking(clone, Collections.<String, String>emptyMap(), false);
newRecord.setDestination(destination.getFlowFileQueue());
newRecord.setTransferRelationship(record.getTransferRelationship());
@ -356,6 +386,110 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
private boolean isRetry(final StandardRepositoryRecord record) {
final Relationship relationship = record.getTransferRelationship();
if (relationship == null) {
return false;
}
final Connectable connectable = context.getConnectable();
if (!connectable.isRelationshipRetried(relationship)) {
return false;
}
// If the FlowFile was created in this session and has no lineage (i.e., it was created by a source component),
// we do not want to retry the FlowFile, as there is no way to roll back.
final String uuid = record.getCurrent().getAttribute(CoreAttributes.UUID.key());
if (createdFlowFilesWithoutLineage.contains(uuid)) {
return false;
}
final int retryCount = getRetries(record.getCurrent());
return retryCount < connectable.getRetryCount();
}
private void retry(final StandardRepositoryRecord record, final long maxBackoffMillis) {
LOG.debug("Updating state to retry {}", record.getCurrent());
final Connectable connectable = context.getConnectable();
final int currentRetries = getRetries(record.getCurrent());
// Account for any statistics that have been added to for FlowFiles/Bytes In/Out
final Relationship relationship = record.getTransferRelationship();
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
final boolean autoTerminated = connectable.isAutoTerminated(relationship);
if (!autoTerminated) {
flowFilesOut-= multiplier;
contentSizeOut -= record.getCurrent().getSize() * multiplier;
}
final FlowFileRecord original = record.getOriginal();
if (original != null) {
flowFilesIn--;
contentSizeIn -= original.getSize();
}
// If any content has been created but is no longer being used (i.e., the FlowFile was written to but is now being reverted back to its
// previous content), then remove the temporary content.
removeTemporaryClaim(record);
// Adjust for any state that has been updated for the Record that is no longer relevant.
final String uuid = record.getCurrent().getAttribute(CoreAttributes.UUID.key());
final FlowFileRecord updatedFlowFile = new StandardFlowFileRecord.Builder()
.fromFlowFile(record.getOriginal())
.addAttribute(retryAttribute, String.valueOf(currentRetries + 1))
.build();
if (original == null) {
record.markForDelete();
} else {
record.setTransferRelationship(Relationship.SELF);
}
record.setWorking(updatedFlowFile, false);
// Remove any Provenance Events that have been generated for this Record
provenanceReporter.removeEventsForFlowFile(uuid);
forkEventBuilders.remove(record.getCurrent());
createdFlowFiles.remove(uuid);
createdFlowFilesWithoutLineage.remove(uuid);
removedFlowFiles.remove(uuid);
// Penalize the FlowFile or yield the connectable, according to the component configuration.
final BackoffMechanism backoffMechanism = connectable.getBackoffMechanism();
if (backoffMechanism == BackoffMechanism.PENALIZE_FLOWFILE) {
if (!record.isMarkedForDelete()) {
final long backoffTime = calculateBackoffTime(currentRetries, maxBackoffMillis, connectable.getPenalizationPeriod(TimeUnit.MILLISECONDS));
penalize(record.getCurrent(), backoffTime, TimeUnit.MILLISECONDS);
}
} else {
final long backoffTime = calculateBackoffTime(currentRetries, maxBackoffMillis, connectable.getYieldPeriod(TimeUnit.MILLISECONDS));
connectable.yield(backoffTime, TimeUnit.MILLISECONDS);
}
}
private int getRetries(final FlowFile flowFile) {
if (flowFile == null) {
return 0;
}
final String attributeValue = flowFile.getAttribute(retryAttribute);
if (attributeValue == null) {
return 0;
}
try {
return Integer.parseInt(attributeValue);
} catch (final Exception e) {
return 0;
}
}
private long calculateBackoffTime(final int retryCount, final long maxBackoffPeriod, final long baseBackoffTime) {
return (long) Math.min(maxBackoffPeriod, Math.pow(2, retryCount) * baseBackoffTime);
}
@Override
public synchronized void commit() {
commit(false);
@ -798,11 +932,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
}
if (!eventAdded && !repoRecord.getUpdatedAttributes().isEmpty()) {
if (!eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() && curFlowFile.getAttribute(retryAttribute) == null) {
// We generate an ATTRIBUTES_MODIFIED event only if no other event has been
// created for the FlowFile. We do this because all events contain both the
// newest and the original attributes, so generating an ATTRIBUTES_MODIFIED
// event is redundant if another already exists.
// We don't generate ATTRIBUTES_MODIFIED event for retry.
if (!eventTypesPerFlowFileId.containsKey(flowFileId)) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
@ -1261,7 +1396,10 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
bytesWritten = 0L;
connectionCounts.clear();
createdFlowFiles.clear();
createdFlowFilesWithoutLineage.clear();
removedFlowFiles.clear();
flowFileLinkage.clear();
if (countersOnCommit != null) {
countersOnCommit.clear();
}
@ -1406,8 +1544,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
for (final FlowFile flowFile : flowFiles) {
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final StandardRepositoryRecord repoRecord = this.records.remove(flowFile.getId());
newOwner.records.put(flowFileRecord.getId(), repoRecord);
final long flowFileId = flowFile.getId();
final StandardRepositoryRecord repoRecord = this.records.remove(flowFileId);
newOwner.records.put(flowFileId, repoRecord);
final Collection<Long> linkedIds = this.flowFileLinkage.remove(flowFileId);
if (linkedIds != null) {
linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
}
// Adjust the counts for Connections for each FlowFile that was pulled from a Connection.
// We do not have to worry about accounting for 'input counts' on connections because those
@ -1429,9 +1573,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
newOwner.contentSizeIn += flowFile.getSize();
}
final String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
if (removedFlowFiles.remove(flowFileId)) {
newOwner.removedFlowFiles.add(flowFileId);
final String flowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
if (removedFlowFiles.remove(flowFileUuid)) {
newOwner.removedFlowFiles.add(flowFileUuid);
newOwner.removedCount++;
newOwner.removedBytes += flowFile.getSize();
@ -1439,8 +1583,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
removedBytes -= flowFile.getSize();
}
if (createdFlowFiles.remove(flowFileId)) {
newOwner.createdFlowFiles.add(flowFileId);
if (createdFlowFiles.remove(flowFileUuid)) {
newOwner.createdFlowFiles.add(flowFileUuid);
}
if (repoRecord.getTransferRelationship() != null) {
@ -1834,7 +1978,10 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, attrs, false);
records.put(fFile.getId(), record);
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
createdFlowFiles.add(uuid);
createdFlowFilesWithoutLineage.add(uuid);
return fFile;
}
@ -1876,6 +2023,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
registerForkEvent(parent, fFile);
flowFileLinkage.addLink(parent.getId(), fFile.getId());
return fFile;
}
@ -1925,6 +2073,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
registerJoinEvent(fFile, parents);
final long flowFileId = fFile.getId();
for (final FlowFile parent : parents) {
flowFileLinkage.addLink(flowFileId, parent.getId());
}
return fFile;
}
@ -1970,6 +2124,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
registerForkEvent(example, clone);
}
flowFileLinkage.addLink(example.getId(), clone.getId());
return clone;
}
@ -2006,10 +2162,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
@Override
public FlowFile penalize(FlowFile flowFile) {
verifyTaskActive();
flowFile = validateRecordState(flowFile, false);
flowFile = validateRecordState(flowFile);
return penalize(flowFile, context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
public FlowFile penalize(FlowFile flowFile, final long period, final TimeUnit timeUnit) {
flowFile = getRecord(flowFile).getCurrent();
final StandardRepositoryRecord record = getRecord(flowFile);
final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
final long penalizeMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit);
final long expirationEpochMillis = System.currentTimeMillis() + penalizeMillis;
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
record.setWorking(newFile, false);
return newFile;
@ -2250,6 +2412,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
removedBytes += flowFile.getSize();
provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
}
flowFileLinkage.remove(flowFile.getId());
}
@Override
@ -2272,6 +2436,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
removedBytes += flowFile.getSize();
provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
}
flowFileLinkage.remove(flowFile.getId());
}
}
@ -3821,4 +3987,47 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
return removedBytes;
}
}
private static class FlowFileLinkage {
private final Map<Long, List<Long>> linkedIds = new HashMap<>();
public void addLink(final long id, final long other) {
if (id == other) {
return;
}
linkedIds.computeIfAbsent(id, key -> new ArrayList<>()).add(other);
linkedIds.computeIfAbsent(other, key -> new ArrayList<>()).add(id);
}
public Collection<Long> getLinkedIds(final long id) {
final List<Long> linked = linkedIds.get(id);
final Set<Long> allLinked = new HashSet<>();
if (linked != null) {
allLinked.addAll(linked);
for (final Long linkedId : linked) {
final List<Long> onceRemoved = linkedIds.get(linkedId);
allLinked.addAll(onceRemoved);
}
}
return allLinked;
}
public Collection<Long> remove(final long id) {
final List<Long> linked = linkedIds.remove(id);
if (linked != null) {
for (final Long otherId : linked) {
linkedIds.get(otherId).remove(id);
}
}
return linked;
}
public void clear() {
linkedIds.clear();
}
}
}

View File

@ -73,6 +73,11 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter {
events.remove(event);
}
@Override
public void removeEventsForFlowFile(final String uuid) {
events.removeIf(event -> event.getFlowFileUuid().equals(uuid));
}
@Override
public void clear() {
events.clear();

View File

@ -287,4 +287,15 @@ public class ConnectableProcessContext implements ProcessContext {
public boolean isConnectedToCluster() {
throw new UnsupportedOperationException();
}
@Override
public boolean isRelationshipRetried(Relationship relationship) {
return false;
}
@Override
public int getRetryCount() {
return 0;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
@ -1586,6 +1587,19 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
processor.setYieldPeriod(proposed.getYieldDuration());
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
processor.setMaxBackoffPeriod(proposed.getMaxBackoffPeriod());
processor.setRetriedRelationships(proposed.getRetriedRelationships());
if (proposed.getRetryCount() != null) {
processor.setRetryCount(proposed.getRetryCount());
} else {
processor.setRetryCount(10);
}
if (proposed.getBackoffMechanism() != null) {
processor.setBackoffMechanism(BackoffMechanism.valueOf(proposed.getBackoffMechanism()));
}
final ScheduledState procState = processor.getScheduledState();
final ProcessGroup group = processor.getProcessGroup();
switch (proposed.getScheduledState()) {

View File

@ -347,4 +347,14 @@ public class StandardProcessContext implements ProcessContext, ControllerService
public boolean isConnectedToCluster() {
return nodeTypeProvider.isConnected();
}
@Override
public boolean isRelationshipRetried(Relationship relationship) {
return procNode.isRelationshipRetried(relationship);
}
@Override
public int getRetryCount() {
return procNode.getRetryCount();
}
}

View File

@ -700,6 +700,10 @@ public class NiFiRegistryFlowMapper {
processor.setStyle(procNode.getStyle());
processor.setYieldDuration(procNode.getYieldPeriod());
processor.setScheduledState(flowMappingOptions.getStateLookup().getState(procNode));
processor.setRetryCount(procNode.getRetryCount());
processor.setRetriedRelationships(procNode.getRetriedRelationships());
processor.setBackoffMechanism(procNode.getBackoffMechanism().name());
processor.setMaxBackoffPeriod(procNode.getMaxBackoffPeriod());
return processor;
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
@ -215,6 +216,7 @@ public class TestNiFiRegistryFlowMapper {
when(procNode.getPosition()).thenReturn(new Position(0D, 0D));
when(procNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN);
when(procNode.getPhysicalScheduledState()).thenReturn(ScheduledState.STOPPED);
when(procNode.getBackoffMechanism()).thenReturn(BackoffMechanism.YIELD_PROCESSOR);
return procNode;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.connectable;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessSession;
@ -291,4 +292,22 @@ public interface Connectable extends Triggerable, ComponentAuthorizable, Positio
* @return the type of the component. I.e., the class name of the implementation
*/
String getComponentType();
int getRetryCount();
void setRetryCount(Integer retryCount);
Set<String> getRetriedRelationships();
void setRetriedRelationships(Set<String> retriedRelationships);
boolean isRelationshipRetried(Relationship relationship);
BackoffMechanism getBackoffMechanism();
void setBackoffMechanism(BackoffMechanism backoffMechanism);
String getMaxBackoffPeriod();
void setMaxBackoffPeriod(String maxBackoffPeriod);
}

View File

@ -62,6 +62,7 @@ public abstract class AbstractPort implements Port {
.build();
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
private static final String DEFAULT_MAX_BACKOFF_PERIOD = "10 mins";
private final List<Relationship> relationships;
@ -659,4 +660,45 @@ public abstract class AbstractPort implements Port {
}
}
}
@Override
public int getRetryCount() {
return 0;
}
@Override
public void setRetryCount(Integer retryCount) {
}
@Override
public Set<String> getRetriedRelationships() {
return Collections.EMPTY_SET;
}
@Override
public void setRetriedRelationships(Set<String> retriedRelationships) {
}
@Override
public boolean isRelationshipRetried(Relationship relationship) {
return false;
}
@Override
public BackoffMechanism getBackoffMechanism() {
return BackoffMechanism.PENALIZE_FLOWFILE;
}
@Override
public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
}
@Override
public String getMaxBackoffPeriod() {
return DEFAULT_MAX_BACKOFF_PERIOD;
}
@Override
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
public enum BackoffMechanism {
PENALIZE_FLOWFILE, YIELD_PROCESSOR
}

View File

@ -58,6 +58,7 @@ import static java.util.Objects.requireNonNull;
public class StandardFunnel implements Funnel {
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
private static final String DEFAULT_MAX_BACKOFF_PERIOD = "10 mins";
private final String identifier;
private final Set<Connection> outgoingConnections;
@ -577,6 +578,47 @@ public class StandardFunnel implements Funnel {
return "Funnel";
}
@Override
public int getRetryCount() {
return 0;
}
@Override
public void setRetryCount(Integer retryCount) {
}
@Override
public Set<String> getRetriedRelationships() {
return Collections.EMPTY_SET;
}
@Override
public void setRetriedRelationships(Set<String> retriedRelationships) {
}
@Override
public boolean isRelationshipRetried(Relationship relationship) {
return false;
}
@Override
public BackoffMechanism getBackoffMechanism() {
return BackoffMechanism.PENALIZE_FLOWFILE;
}
@Override
public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
}
@Override
public String getMaxBackoffPeriod() {
return DEFAULT_MAX_BACKOFF_PERIOD;
}
@Override
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
}
@Override
public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());

View File

@ -35,6 +35,8 @@ public interface InternalProvenanceReporter extends ProvenanceReporter {
void remove(ProvenanceEventRecord event);
void removeEventsForFlowFile(String uuid);
void clear();
void migrate(InternalProvenanceReporter newOwner, Collection<String> flowFileIds);

View File

@ -371,6 +371,12 @@ public class StandardFlowSnippet implements FlowSnippet {
procNode.setPenalizationPeriod(config.getPenaltyDuration());
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
procNode.setAnnotationData(config.getAnnotationData());
procNode.setRetryCount(config.getRetryCount());
procNode.setRetriedRelationships(config.getRetriedRelationships());
if (config.getBackoffMechanism() != null) {
procNode.setBackoffMechanism(BackoffMechanism.valueOf(config.getBackoffMechanism()));
}
procNode.setMaxBackoffPeriod(config.getMaxBackoffPeriod());
procNode.setStyle(processorDTO.getStyle());
if (config.getRunDurationMillis() != null) {

View File

@ -1232,6 +1232,13 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
procNode.setPenalizationPeriod(config.getPenaltyDuration());
procNode.setYieldPeriod(config.getYieldDuration());
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
procNode.setRetryCount(config.getRetryCount());
procNode.setRetriedRelationships(config.getRetriedRelationships());
if (config.getBackoffMechanism() != null) {
procNode.setBackoffMechanism(BackoffMechanism.valueOf(config.getBackoffMechanism()));
}
procNode.setMaxBackoffPeriod(config.getMaxBackoffPeriod());
updateNonFingerprintedProcessorSettings(procNode, processorDTO);
if (config.getSchedulingStrategy() != null) {

View File

@ -499,6 +499,21 @@ public class FlowFromDOMFactory {
configDto.setYieldDuration(getString(element, "yieldPeriod"));
configDto.setBulletinLevel(getString(element, "bulletinLevel"));
configDto.setLossTolerant(getBoolean(element, "lossTolerant"));
if (getString(element, "retryCount") != null) {
configDto.setRetryCount(getInt(element, "retryCount"));
} else {
configDto.setRetryCount(10);
}
configDto.setMaxBackoffPeriod(getString(element, "maxBackoffPeriod"));
configDto.setBackoffMechanism(getString(element, "backoffMechanism"));
final Set<String> retriedRelationships = new HashSet<>();
final List<Element> retriedRelationshipList = getChildrenByTagName(element, "retriedRelationship");
for (final Element retriedRelationship : retriedRelationshipList) {
retriedRelationships.add(retriedRelationship.getTextContent());
}
configDto.setRetriedRelationships(retriedRelationships);
final ScheduledState scheduledState = getScheduledState(element);
dto.setState(scheduledState.toString());

View File

@ -515,6 +515,13 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
addTextElement(element, "executionNode", processor.getExecutionNode().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
addTextElement(element, "retryCount", processor.getRetryCount());
addTextElement(element, "backoffMechanism", processor.getBackoffMechanism().name());
addTextElement(element, "maxBackoffPeriod", processor.getMaxBackoffPeriod());
for (final String relationship : processor.getRetriedRelationships()) {
addTextElement(element, "retriedRelationship", relationship);
}
addConfiguration(element, processor.getRawPropertyValues(), processor.getAnnotationData(), encryptor);

View File

@ -481,6 +481,12 @@ public class FingerprintFactory {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "executionNode"));
// run duration nanos
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "runDurationNanos"));
// retry count
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "retryCount"));
// backoff mechanism
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "backoffMechanism"));
// max backoff period
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "maxBackoffPeriod"));
// get the temp instance of the Processor so that we know the default property values
final BundleCoordinate coordinate = getCoordinate(className, bundle);
@ -504,6 +510,12 @@ public class FingerprintFactory {
builder.append(autoTerminateElem.getTextContent());
}
final NodeList retriedRelationshipsElems = DomUtils.getChildNodesByTagName(processorElem, "retriedRelationships");
final List<Element> sortedRetriedRelationshipsElems = sortElements(retriedRelationshipsElems, getElementTextComparator());
for (final Element retriedRelationshipElem : sortedRetriedRelationshipsElems) {
builder.append(retriedRelationshipElem.getTextContent());
}
return builder;
}

View File

@ -146,6 +146,15 @@
<!-- Indicates that a relationship with the given name can be auto-terminated -->
<xs:element name="autoTerminatedRelationship" type="xs:string" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="retryCount" type="xs:nonNegativeInteger" minOccurs="0" maxOccurs="1" />
<xs:element name="retriedRelationships" type="xs:string" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="backoffMechanism" type="BackoffMechanism" minOccurs="0" maxOccurs="1" />
<xs:element name="maxBackoffPeriod" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" />
</xs:sequence>
</xs:complexType>
@ -467,6 +476,13 @@
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="BackoffMechanism">
<xs:restriction base="xs:string">
<xs:enumeration value="PENALIZE_FLOWFILE"></xs:enumeration>
<xs:enumeration value="YIELD_PROCESSOR"></xs:enumeration>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="ExecutionNode">
<xs:restriction base="xs:string">
<xs:enumeration value="ALL"></xs:enumeration>

View File

@ -995,6 +995,10 @@ public class TestFlowController {
configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
configDTO.setExecutionNode(processorNode.getExecutionNode().name());
configDTO.setAnnotationData(processorNode.getAnnotationData());
configDTO.setRetryCount(processorNode.getRetryCount());
configDTO.setRetriedRelationships(processorNode.getRetriedRelationships());
configDTO.setBackoffMechanism(processorNode.getBackoffMechanism().name());
configDTO.setMaxBackoffPeriod(processorNode.getMaxBackoffPeriod());
processorDTO.setConfig(configDTO);
@ -1049,6 +1053,10 @@ public class TestFlowController {
configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
configDTO.setExecutionNode(processorNode.getExecutionNode().name());
configDTO.setAnnotationData(processorNode.getAnnotationData());
configDTO.setRetryCount(processorNode.getRetryCount());
configDTO.setRetriedRelationships(processorNode.getRetriedRelationships());
configDTO.setBackoffMechanism(processorNode.getBackoffMechanism().name());
configDTO.setMaxBackoffPeriod(processorNode.getMaxBackoffPeriod());
processorDTO.setConfig(configDTO);
@ -1105,6 +1113,10 @@ public class TestFlowController {
configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
configDTO.setExecutionNode(processorNode.getExecutionNode().name());
configDTO.setAnnotationData(processorNode.getAnnotationData());
configDTO.setRetryCount(processorNode.getRetryCount());
configDTO.setRetriedRelationships(processorNode.getRetriedRelationships());
configDTO.setBackoffMechanism(processorNode.getBackoffMechanism().name());
configDTO.setMaxBackoffPeriod(processorNode.getMaxBackoffPeriod());
processorDTO.setConfig(configDTO);

View File

@ -21,8 +21,11 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.PollStrategy;
@ -40,6 +43,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
@ -91,6 +95,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -102,11 +107,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -184,6 +193,8 @@ public class StandardProcessSessionIT {
when(connectable.getIdentifier()).thenReturn("connectable-1");
when(connectable.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT);
when(connectable.getComponentType()).thenReturn("Unit Test Component");
when(connectable.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE);
when(connectable.getMaxBackoffPeriod()).thenReturn("1 sec");
Mockito.doAnswer(new Answer<Set<Connection>>() {
@Override
@ -2801,7 +2812,335 @@ public class StandardProcessSessionIT {
assertEquals(0, contentRepo.getClaimantCount(getContentClaim(flowFile)));
}
@Test
public void testWhenInRetryAttributeIsAdded() {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
StandardProcessSession session = createSessionForRetry(processor);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(flowFileRecord, relationship);
session.commit();
FlowFile ff2 = session.get();
assertNotNull(ff2);
assertEquals("1", ff2.getAttribute("retryCount." + connectable.getIdentifier()));
}
@Test
public void testWhenRetryCompletedAttributeIsRemoved() {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
final StandardProcessSession session = createSessionForRetry(processor);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(flowFileRecord, relationship);
session.commit();
final FlowFile ff2 = session.get();
assertNotNull(ff2);
assertEquals("1", ff2.getAttribute("retryCount." + processor.getIdentifier()));
session.transfer(flowFileRecord, relationship);
session.commit();
final FlowFile ff3 = session.get();
assertNotNull(ff3);
assertNull(ff3.getAttribute("retryCount." + processor.getIdentifier()));
}
@Test
public void testRetryParentFlowFileRemovesChildren() throws IOException {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.PENALIZE_FLOWFILE, "15 ms", 10000L);
final StandardProcessSession session = createSessionForRetry(processor);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.id(500L)
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationshipA = new Relationship.Builder().name("A").build();
final Relationship relationshipB = new Relationship.Builder().name("B").build();
final FlowFile original = session.get();
assertNotNull(original);
final List<ContentClaim> contentClaims = new ArrayList<>();
for (int i=0; i < 3; i++) {
FlowFile child = session.create(original);
final byte[] contents = String.valueOf(i).getBytes();
child = session.write(child, out -> out.write(contents));
final FlowFileRecord childRecord = (FlowFileRecord) child;
contentClaims.add(childRecord.getContentClaim());
session.transfer(child, relationshipB);
}
session.transfer(original, relationshipA);
session.commit();
assertEquals(1, flowFileQueue.size().getObjectCount());
final List<ProvenanceEventRecord> provEvents = provenanceRepo.getEvents(0, 1000);
assertEquals(0, provEvents.size());
assertNull(flowFileRecord.getContentClaim());
for (final ContentClaim claim : contentClaims) {
assertEquals(0, contentRepo.getClaimantCount(claim));
}
}
@Test
public void testWhenFlowFilePenalizeBackoffMechanismConfiguredFlowFileIsPenalized() throws IOException {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.PENALIZE_FLOWFILE, "15 ms", 10000L);
final StandardProcessSession session = createSessionForRetry(processor);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(flowFileRecord, relationship);
session.commit();
assertTrue(flowFileQueue.getFlowFile(ff1.getAttribute(CoreAttributes.UUID.key())).isPenalized());
}
@Test
public void testWhenYieldingBackoffMechanismConfiguredProcessorIsYielded() {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
final StandardProcessSession session = createSessionForRetry(processor);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(flowFileRecord, relationship);
session.commit();
verify(processor).yield(anyLong(), eq(TimeUnit.MILLISECONDS));
}
@Test
public void testWhenInRetryProcessorStatisticsAdjusted() throws IOException {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
final StandardProcessSession session = createSessionForRetry(processor);
final ContentClaim contentClaim = contentRepo.create("original".getBytes());
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(8L)
.contentClaim(contentClaim)
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(flowFileRecord, relationship);
session.commit();
final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
final int flowFilesIn = report.getReportEntry("connectable-1").getFlowFilesIn();
assertEquals(0, flowFilesIn);
final int flowFilesOut = report.getReportEntry("connectable-1").getFlowFilesOut();
assertEquals(0, flowFilesOut);
final long contentSizeIn = report.getReportEntry("connectable-1").getContentSizeIn();
assertEquals(0, contentSizeIn);
final long contentSizeOut = report.getReportEntry("connectable-1").getContentSizeOut();
assertEquals(0, contentSizeOut);
}
@Test
public void testWhenInRetryEventsAreCleared() throws IOException {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
final StandardProcessSession session = createSessionForRetry(processor);
final ContentClaim contentClaim = contentRepo.create("original".getBytes());
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(8L)
.contentClaim(contentClaim)
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
assertNotNull(ff1);
final FlowFile ff2 = session.create(ff1);
session.transfer(ff1, relationship);
session.transfer(ff2, relationship);
session.commit();
final List<ProvenanceEventRecord> provEvents = provenanceRepo.getEvents(0L, 1000);
assertEquals(0, provEvents.size());
}
@Test
public void testWhenInRetryClaimsAreCleared() throws IOException {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
final StandardProcessSession session = createSessionForRetry(processor);
final byte[] originalContent = "original".getBytes();
final byte[] replacementContent = "modified data".getBytes();
final ContentClaim originalClaim = contentRepo.create(originalContent);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.contentClaim(originalClaim)
.size(originalContent.length)
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
final FlowFile modified = session.write(ff1, (in, out) -> out.write(replacementContent));
session.transfer(modified, relationship);
session.commit();
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
assertEquals(0, contentRepo.getClaimantCount(((FlowFileRecord) modified).getContentClaim()));
}
@Test
public void testWhenInRetryContentRestored() throws IOException {
final Connectable processor = createProcessorConnectable();
configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L);
final StandardProcessSession session = createSessionForRetry(processor);
final byte[] originalContent = "original".getBytes();
final byte[] replacementContent = "modified data".getBytes();
final ContentClaim originalClaim = contentRepo.create(originalContent);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.contentClaim(originalClaim)
.size(originalContent.length)
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
final FlowFile ff1 = session.get();
final FlowFile modified = session.write(ff1, (in, out) -> out.write(replacementContent));
session.transfer(modified, relationship);
session.commit();
final FlowFile ff2 = session.get();
assertEquals(originalContent.length, ff2.getSize());
assertEquals(originalClaim, ((FlowFileRecord) ff2).getContentClaim());
}
public void configureRetry(final Connectable connectable, final int retryCount, final BackoffMechanism backoffMechanism,
final String maxBackoffPeriod, final long penalizationPeriod) {
Processor proc = mock(Processor.class);
when(((ProcessorNode) connectable).getProcessor()).thenReturn( proc);
when((connectable).isRelationshipRetried(any())).thenReturn(true);
when((connectable).getRetryCount()).thenReturn(retryCount);
when((connectable).getBackoffMechanism()).thenReturn(backoffMechanism);
when((connectable).getMaxBackoffPeriod()).thenReturn(maxBackoffPeriod);
when((connectable).getRetriedRelationships()).thenReturn(Collections.singleton(FAKE_RELATIONSHIP.getName()));
when(connectable.getPenalizationPeriod(any(TimeUnit.class))).thenReturn(penalizationPeriod);
when(connectable.getYieldPeriod(any(TimeUnit.class))).thenReturn(penalizationPeriod);
}
public Connectable createProcessorConnectable() {
Connectable connectable = mock(StandardProcessorNode.class);
final Connection connection = createConnection();
final Connection connectionB = createConnection();
final List<Connection> connList = new ArrayList<>();
connList.add(connection);
when(connectable.hasIncomingConnection()).thenReturn(true);
when(connectable.getIncomingConnections()).thenReturn(connList);
when(connectable.getIdentifier()).thenReturn("connectable-1");
when(connectable.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
when(connectable.getComponentType()).thenReturn("Unit Test Component");
Mockito.doAnswer((Answer<Set<Connection>>) invocation -> {
final Object[] arguments = invocation.getArguments();
final Relationship relationship = (Relationship) arguments[0];
if (relationship == Relationship.SELF) {
return Collections.emptySet();
} else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP)) {
return null;
} else if (relationship.getName().equals("B")) {
return Collections.singleton(connectionB);
} else {
return new HashSet<>(connList);
}
}).when(connectable).getConnections(Mockito.any(Relationship.class));
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
return connectable;
}
public StandardProcessSession createSessionForRetry(final Connectable connectable) {
StandardRepositoryContext context = new StandardRepositoryContext(connectable,
new AtomicLong(0L),
contentRepo,
flowFileRepo,
flowFileEventRepository,
counterRepository,
provenanceRepo,
stateManager);
return new StandardProcessSession(context, () -> false);
}
private static class MockFlowFileRepository implements FlowFileRepository {

View File

@ -28,6 +28,7 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
@ -459,6 +460,7 @@ public class NiFiRegistryFlowMapperTest {
when(processorNode.getExecutionNode()).thenReturn(ExecutionNode.ALL);
when(processorNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN);
when(processorNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
when(processorNode.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE);
final String rawPropertyValue = "propValue";
final PropertyDescriptor.Builder propertyDescriptorBuilder =

View File

@ -129,4 +129,14 @@ public class MockProcessContext implements ProcessContext {
public boolean isConnectedToCluster() {
return true;
}
@Override
public boolean isRelationshipRetried(Relationship relationshipName) {
return false;
}
@Override
public int getRetryCount() {
return 0;
}
}

View File

@ -80,6 +80,10 @@ public class ProcessorAuditor extends NiFiAuditor {
private static final String SCHEDULING_STRATEGY = "Scheduling Strategy";
private static final String EXECUTION_NODE = "Execution Node";
private static final String EXTENSION_VERSION = "Extension Version";
private static final String RETRY_COUNT = "Retry Count";
private static final String RETRIED_RELATIONSHIPS = "Retried Relationships";
private static final String BACKOFF_MECHANISM = "Backoff Mechanism";
private static final String MAX_BACKOFF_PERIOD = "Max Backoff Period";
/**
* Audits the creation of processors via createProcessor().
@ -423,6 +427,24 @@ public class ProcessorAuditor extends NiFiAuditor {
if (newConfig.getExecutionNode() != null) {
values.put(EXECUTION_NODE, processor.getExecutionNode().name());
}
if (newConfig.getRetryCount() != null) {
values.put(RETRY_COUNT, String.valueOf(processor.getRetryCount()));
}
if (newConfig.getRetriedRelationships() != null) {
final List<String> retriedRelationships = new ArrayList<>(processor.getRetriedRelationships());
Collections.sort(retriedRelationships, Collator.getInstance(Locale.US));
values.put(RETRIED_RELATIONSHIPS, StringUtils.join(retriedRelationships, ", "));
}
if (newConfig.getBackoffMechanism() != null) {
values.put(BACKOFF_MECHANISM, processor.getBackoffMechanism().name());
}
if (newConfig.getMaxBackoffPeriod() != null) {
values.put(MAX_BACKOFF_PERIOD, processor.getMaxBackoffPeriod());
}
}
return values;

View File

@ -32,8 +32,10 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.ui.extension.UiExtension;
import org.apache.nifi.ui.extension.UiExtensionMapping;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UiExtensionType;
@ -835,6 +837,25 @@ public class ProcessorResource extends ApplicationResource {
}
}
final ProcessorConfigDTO processorConfig = requestProcessorDTO.getConfig();
if (processorConfig != null) {
if (processorConfig.getRetryCount() != null && processorConfig.getRetryCount() < 0) {
throw new IllegalArgumentException("Retry Count should not be less than zero.");
}
if (processorConfig.getBackoffMechanism() != null) {
try {
BackoffMechanism.valueOf(processorConfig.getBackoffMechanism());
} catch (Exception e) {
throw new IllegalArgumentException("Backoff Mechanism " + processorConfig.getBackoffMechanism() + " is invalid.");
}
}
if (processorConfig.getMaxBackoffPeriod() != null && !FormatUtils.TIME_DURATION_PATTERN.matcher(processorConfig.getMaxBackoffPeriod()).matches()) {
throw new IllegalArgumentException("Max Backoff Period should be specified as time, for example 5 mins");
}
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestProcessorEntity);
} else if (isDisconnectedFromCluster()) {

View File

@ -3155,6 +3155,7 @@ public final class DtoFactory {
relationshipDTO.setDescription(rel.getDescription());
relationshipDTO.setName(rel.getName());
relationshipDTO.setAutoTerminate(node.isAutoTerminated(rel));
relationshipDTO.setRetry(node.isRelationshipRetried(rel));
relationships.add(relationshipDTO);
}
@ -3994,6 +3995,11 @@ public final class DtoFactory {
dto.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
dto.setExecutionNode(procNode.getExecutionNode().name());
dto.setBackoffMechanism(procNode.getBackoffMechanism().name());
dto.setMaxBackoffPeriod(procNode.getMaxBackoffPeriod());
dto.setRetriedRelationships(procNode.getRetriedRelationships());
dto.setRetryCount(procNode.getRetryCount());
return dto;
}
@ -4224,6 +4230,10 @@ public final class DtoFactory {
copy.setDefaultConcurrentTasks(original.getDefaultConcurrentTasks());
copy.setDefaultSchedulingPeriod(original.getDefaultSchedulingPeriod());
copy.setLossTolerant(original.isLossTolerant());
copy.setBackoffMechanism(original.getBackoffMechanism());
copy.setMaxBackoffPeriod(original.getMaxBackoffPeriod());
copy.setRetryCount(original.getRetryCount());
copy.setRetriedRelationships(original.getRetriedRelationships());
return copy;
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@ -154,6 +155,10 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
final Long runDurationMillis = config.getRunDurationMillis();
final String bulletinLevel = config.getBulletinLevel();
final Set<String> undefinedRelationshipsToTerminate = config.getAutoTerminatedRelationships();
final Integer retryCount = config.getRetryCount();
final Set<String> retriedRelationships = config.getRetriedRelationships();
final String backoffMechanism = config.getBackoffMechanism();
final String maxBackoffPeriod = config.getMaxBackoffPeriod();
processor.pauseValidationTrigger(); // ensure that we don't trigger many validations to occur
try {
@ -196,6 +201,22 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
processor.setProperties(configProperties);
}
if (isNotNull(retryCount)) {
processor.setRetryCount(retryCount);
}
if (isNotNull(retriedRelationships)) {
processor.setRetriedRelationships(retriedRelationships);
}
if (isNotNull(backoffMechanism)) {
processor.setBackoffMechanism(BackoffMechanism.valueOf(backoffMechanism));
}
if (isNotNull(maxBackoffPeriod)) {
processor.setMaxBackoffPeriod(maxBackoffPeriod);
}
if (isNotNull(undefinedRelationshipsToTerminate)) {
final Set<Relationship> relationships = new HashSet<>();
for (final String relName : undefinedRelationshipsToTerminate) {
@ -427,7 +448,11 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
configDTO.getSchedulingPeriod(),
configDTO.getSchedulingStrategy(),
configDTO.getExecutionNode(),
configDTO.getYieldDuration())) {
configDTO.getYieldDuration(),
configDTO.getRetryCount(),
configDTO.getBackoffMechanism(),
configDTO.getMaxBackoffPeriod(),
configDTO.getRetriedRelationships())) {
modificationRequest = true;
}

View File

@ -53,6 +53,26 @@ public enum DifferenceType {
*/
YIELD_DURATION_CHANGED("Yield Duration Changed"),
/**
* The component has a different retry count in each of the flows
*/
RETRY_COUNT_CHANGED("Retry Count Changed"),
/**
* The component has a different retried relationship in each of the flows
*/
RETRIED_RELATIONSHIPS_CHANGED("Retried Relationships Changed"),
/**
* The component has a different backoff mechanism in each of the flows
*/
BACKOFF_MECHANISM_CHANGED("Backoff Mechanism Changed"),
/**
* The component has a different max backoff period in each of the flows
*/
MAX_BACKOFF_PERIOD_CHANGED("Max Backoff Period Changed"),
/**
* The component has a different bulletin level in each of the flows
*/

View File

@ -169,6 +169,10 @@ public class StandardFlowComparator implements FlowComparator {
addIfDifferent(differences, DifferenceType.SCHEDULED_STATE_CHANGED, processorA, processorB, VersionedProcessor::getScheduledState);
addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, VersionedProcessor::getStyle);
addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getYieldDuration);
addIfDifferent(differences, DifferenceType.RETRY_COUNT_CHANGED, processorA, processorB, VersionedProcessor::getRetryCount);
addIfDifferent(differences, DifferenceType.RETRIED_RELATIONSHIPS_CHANGED, processorA, processorB, VersionedProcessor::getRetriedRelationships);
addIfDifferent(differences, DifferenceType.BACKOFF_MECHANISM_CHANGED, processorA, processorB, VersionedProcessor::getBackoffMechanism);
addIfDifferent(differences, DifferenceType.MAX_BACKOFF_PERIOD_CHANGED, processorA, processorB, VersionedProcessor::getMaxBackoffPeriod);
compareProperties(processorA, processorB, processorA.getProperties(), processorB.getProperties(), processorA.getPropertyDescriptors(), processorB.getPropertyDescriptors(), differences);
}

View File

@ -144,6 +144,10 @@ public class VersionedFlowBuilder {
processor.setType(type);
processor.setYieldDuration("1 sec");
processor.setSchedulingStrategy("TIMER_DRIVEN");
processor.setRetryCount(0);
processor.setBackoffMechanism("PENALIZE_FLOWFILE");
processor.setRetriedRelationships(Collections.EMPTY_SET);
processor.setMaxBackoffPeriod("0 sec");
group.getProcessors().add(processor);
return processor;

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.basics;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StatelessRetryIT extends StatelessSystemIT {
private static final int RETRY_COUNT = 2;
private static final String EXPECTED_COUNTER = String.valueOf(RETRY_COUNT + 1);
@Test
public void testRetryHappensTwiceThenFinishes() throws StatelessConfigurationException, IOException, InterruptedException {
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
//Create a GenerateFlowFile processor
final VersionedProcessor generateFlowFile = flowBuilder.createSimpleProcessor("GenerateFlowFile");
//Create a CountFlowFiles processor and configure 2 retries
final VersionedProcessor countFlowFiles = flowBuilder.createSimpleProcessor("CountFlowFiles");
countFlowFiles.setMaxBackoffPeriod("1 ms");
countFlowFiles.setRetryCount(RETRY_COUNT);
countFlowFiles.setBackoffMechanism("PENALIZE_FLOWFILE");
countFlowFiles.setRetriedRelationships(Collections.singleton("success"));
countFlowFiles.setPenaltyDuration("1 ms");
//Create a CountService and add it to the CountFlowFiles processor
final VersionedControllerService countService = flowBuilder.createSimpleControllerService("StandardCountService", "CountService");
flowBuilder.addControllerServiceReference(countFlowFiles, countService, "Count Service");
//Create an Output port
final VersionedPort outPort = flowBuilder.createOutputPort("Out");
//Connect GenerateFlowFile and CountFlowFiles processors
flowBuilder.createConnection(generateFlowFile, countFlowFiles, "success");
//Connect Output port and CountFlowFiles processor
flowBuilder.createConnection(countFlowFiles, outPort, "success");
// Startup the dataflow
final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
// Enqueue data and trigger
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
assertTrue(result.isSuccessful());
//Assert that CountService has been called 3 times (2 retries and 1 final run)
final List<FlowFile> flowFiles = result.getOutputFlowFiles("Out");
assertEquals(1, flowFiles.size());
assertEquals(EXPECTED_COUNTER, flowFiles.get(0).getAttribute("count"));
result.acknowledge();
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class SplitTextByLine extends AbstractProcessor {
static final Relationship ORIGINAL = new Relationship.Builder()
.name("original")
.build();
static final Relationship SPLITS = new Relationship.Builder()
.name("splits")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.build();
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(ORIGINAL, SPLITS, FAILURE));
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final List<String> splits = new ArrayList<>();
try (final InputStream in = session.read(flowFile);
final Reader streamReader = new InputStreamReader(in);
final BufferedReader reader = new BufferedReader(streamReader)) {
String line;
while ((line = reader.readLine()) != null) {
splits.add(line);
}
} catch (IOException ioe) {
session.transfer(flowFile, FAILURE);
}
long offset = 0;
for (String splitText : splits) {
FlowFile splitFlowFile = session.clone(flowFile, offset, splitText.getBytes(StandardCharsets.UTF_8).length);
offset = offset + splitText.getBytes(StandardCharsets.UTF_8).length;
splitFlowFile = session.write(splitFlowFile, out -> out.write(splitText.getBytes(StandardCharsets.UTF_8)));
session.transfer(splitFlowFile, SPLITS);
}
session.transfer(flowFile, ORIGINAL);
}
}

View File

@ -45,3 +45,4 @@ org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
org.apache.nifi.processors.tests.system.WriteLifecycleEvents
org.apache.nifi.processors.tests.system.WriteToFile
org.apache.nifi.processors.tests.system.SplitTextByLine

View File

@ -0,0 +1,329 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.processor;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
public class RetryIT extends NiFiSystemIT {
private static final int RETRY_COUNT = 2;
@Test
public void testRetryHappensTwiceThenFinishes() throws NiFiClientException, IOException, InterruptedException {
//Create a GenerateFlowFile processor
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
//Create a PassThrough processor
final ProcessorEntity passThrough = getClientUtil().createProcessor("PassThrough");
//Configure the processor's success relationship to be retried twice with flow file penalization for a maximum of 1 ms
enableRetries(passThrough, Collections.singleton("success"));
//Create a connection between GenerateFlowFile and PassThrough processors
final ConnectionEntity generateFlowFileAndPassThroughConnection = getClientUtil().createConnection(generateFlowFile, passThrough, "success");
//Create a CountEvents processor with autoterminated relationship
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
//Create a connection between PassThrough and CountEvents processors
final ConnectionEntity passThroughAndCountEventsConnection = getClientUtil().createConnection(passThrough, countEvents, "success");
// Wait for processors to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(passThrough.getId());
getClientUtil().waitForValidProcessor(countEvents.getId());
//Generate flow file
runProcessorOnce(generateFlowFile);
getClientUtil().waitForStoppedProcessor(generateFlowFile.getId());
waitForQueueCount(generateFlowFileAndPassThroughConnection.getId(), 1);
assertEquals(0, getRetryCount(generateFlowFileAndPassThroughConnection));
//Run PassThrough processor, this will trigger the first retry
runProcessorOnce(passThrough);
getClientUtil().waitForStoppedProcessor(passThrough.getId());
waitForQueueCount(generateFlowFileAndPassThroughConnection.getId(), 1);
assertEquals(1, getRetryCount(generateFlowFileAndPassThroughConnection));
//Running PassThrough processor again, will trigger another retry
runProcessorOnce(passThrough);
getClientUtil().waitForStoppedProcessor(passThrough.getId());
waitForQueueCount(generateFlowFileAndPassThroughConnection.getId(), 1);
assertEquals(2, getRetryCount(generateFlowFileAndPassThroughConnection));
//Running PassThrough processor again should finish retry and transfer flow file
runProcessorOnce(passThrough);
getClientUtil().waitForStoppedProcessor(passThrough.getId());
waitForQueueCount(passThroughAndCountEventsConnection.getId(), 1);
assertEquals(0, getRetryCount(passThroughAndCountEventsConnection));
assertEquals(0, getConnectionQueueSize(generateFlowFileAndPassThroughConnection.getId()));
}
@Test
public void testNoRetryHappensWithSourceTypeProcessors() throws NiFiClientException, IOException, InterruptedException {
//Create a GenerateFlowFile processor
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
//Configure the processor's success relationship to be retried twice with flow file penalization for a maximum of 1 ms
enableRetries(generateFlowFile, Collections.singleton("success"));
//Create a CountEvents processor with autoterminated relationship
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
//Create a connection between GenerateFlowFile and CountEvents processors
final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success");
// Wait for processors to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(countEvents.getId());
runProcessorOnce(generateFlowFile);
waitForQueueCount(connection.getId(), 1);
assertEquals(0, getRetryCount(connection));
}
private void runProcessorOnce(final ProcessorEntity processorEntity) throws NiFiClientException, IOException, InterruptedException {
getNifiClient().getProcessorClient().runProcessorOnce(processorEntity);
getClientUtil().waitForStoppedProcessor(processorEntity.getId());
}
@Test
public void testRetryHappensWhenOnlyChildRelationshipHasRetryConfigured() throws NiFiClientException, IOException, InterruptedException {
//Create a GenerateFlowFile processor
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
Map<String, String> properties = new HashMap<>();
properties.put("Text", "abc\n123\nxyz\n321");
properties.put("Batch Size", "1");
getClientUtil().updateProcessorProperties(generateFlowFile, properties);
//Create a SplitTextByLine processor
final ProcessorEntity splitTextByLine = getClientUtil().createProcessor("SplitTextByLine");
//Configure the processor's splits relationship to be retried twice with flow file penalization for a maximum of 1 ms
enableRetries(splitTextByLine, Collections.singleton("splits"));
//Create a connection between GenerateFlowFile and SplitTextByLine processors
final ConnectionEntity generateFlowFileAndSplitTextByLineConnection = getClientUtil().createConnection(generateFlowFile, splitTextByLine, "success");
//Create a CountEvents processor with autoterminated relationship
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
//Create a connection between SplitTextByLine and CountEvents processors
final ConnectionEntity splitTextByLineAndCountEventsOriginalConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "original");
final ConnectionEntity splitTextByLineAndCountEventsSplitsConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "splits");
final ConnectionEntity splitTextByLineAndCountEventsFailureConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "failure");
// Wait for processors to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(splitTextByLine.getId());
getClientUtil().waitForValidProcessor(countEvents.getId());
//Generate flow file
runProcessorOnce(generateFlowFile);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(0, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Run SplitTextByLine processor, this will trigger the first retry
runProcessorOnce(splitTextByLine);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(1, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Running SplitTextByLine processor again, will trigger another retry
runProcessorOnce(splitTextByLine);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(2, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Running SplitTextByLine processor again should finish retry and transfer flow file
runProcessorOnce(splitTextByLine);
waitForQueueCount(splitTextByLineAndCountEventsSplitsConnection.getId(), 4);
assertEquals(0, getRetryCount(splitTextByLineAndCountEventsSplitsConnection));
assertEquals(0, getConnectionQueueSize(generateFlowFileAndSplitTextByLineConnection.getId()));
assertEquals(0, getConnectionQueueSize(splitTextByLineAndCountEventsFailureConnection.getId()));
assertEquals(1, getConnectionQueueSize(splitTextByLineAndCountEventsOriginalConnection.getId()));
}
@Test
public void testRetryHappensWhenOnlyParentRelationshipHasRetryConfigured() throws NiFiClientException, IOException, InterruptedException {
//Create a GenerateFlowFile processor
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
Map<String, String> properties = new HashMap<>();
properties.put("Text", "abc\n123\nxyz");
properties.put("Batch Size", "1");
getClientUtil().updateProcessorProperties(generateFlowFile, properties);
//Create a SplitTextByLine processor
final ProcessorEntity splitTextByLine = getClientUtil().createProcessor("SplitTextByLine");
//Configure the processor's original relationship to be retried twice with flow file penalization for a maximum of 1 ms
enableRetries(splitTextByLine, Collections.singleton("original"));
//Create a connection between GenerateFlowFile and SplitTextByLine processors
final ConnectionEntity generateFlowFileAndSplitTextByLineConnection = getClientUtil().createConnection(generateFlowFile, splitTextByLine, "success");
//Create a CountEvents processor with autoterminated relationship
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
//Create a connection between SplitTextByLine and CountEvents processors
final ConnectionEntity splitTextByLineAndCountEventsOriginalConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "original");
final ConnectionEntity splitTextByLineAndCountEventsSplitsConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "splits");
final ConnectionEntity splitTextByLineAndCountEventsFailureConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "failure");
// Wait for processors to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(splitTextByLine.getId());
getClientUtil().waitForValidProcessor(countEvents.getId());
//Generate flow file
runProcessorOnce(generateFlowFile);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(0, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Run SplitTextByLine processor, this will trigger the first retry
runProcessorOnce(splitTextByLine);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(1, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Running SplitTextByLine processor again, will trigger another retry
runProcessorOnce(splitTextByLine);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(2, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Running SplitTextByLine processor again should finish retry and transfer flow file
runProcessorOnce(splitTextByLine);
waitForQueueCount(splitTextByLineAndCountEventsSplitsConnection.getId(), 3);
assertEquals(0, getRetryCount(splitTextByLineAndCountEventsSplitsConnection));
assertEquals(0, getConnectionQueueSize(generateFlowFileAndSplitTextByLineConnection.getId()));
assertEquals(0, getConnectionQueueSize(splitTextByLineAndCountEventsFailureConnection.getId()));
assertEquals(1, getConnectionQueueSize(splitTextByLineAndCountEventsOriginalConnection.getId()));
}
@Test
public void testRetryHappensOnlyOnceWhenBothChildAndParentRelationshipHasRetryConfigured() throws NiFiClientException, IOException, InterruptedException {
//Create a GenerateFlowFile processor
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
Map<String, String> properties = new HashMap<>();
properties.put("Text", "abc\n123\nxyz\n321\ndfg");
properties.put("Batch Size", "1");
getClientUtil().updateProcessorProperties(generateFlowFile, properties);
//Create a SplitTextByLine processor
final ProcessorEntity splitTextByLine = getClientUtil().createProcessor("SplitTextByLine");
//Configure the processor's original and splits relationships to be retried twice with flow file penalization for a maximum of 1 ms
enableRetries(splitTextByLine, new HashSet<>(Arrays.asList("original", "splits")));
//Create a connection between GenerateFlowFile and SplitTextByLine processors
final ConnectionEntity generateFlowFileAndSplitTextByLineConnection = getClientUtil().createConnection(generateFlowFile, splitTextByLine, "success");
//Create a CountEvents processor with autoterminated relationship
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
//Create a connection between SplitTextByLine and CountEvents processors
final ConnectionEntity splitTextByLineAndCountEventsOriginalConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "original");
final ConnectionEntity splitTextByLineAndCountEventsSplitsConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "splits");
final ConnectionEntity splitTextByLineAndCountEventsFailureConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "failure");
// Wait for processors to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(splitTextByLine.getId());
getClientUtil().waitForValidProcessor(countEvents.getId());
//Generate flow file
runProcessorOnce(generateFlowFile);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(0, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Run SplitTextByLine processor, this will trigger the first retry
runProcessorOnce(splitTextByLine);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(1, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Running SplitTextByLine processor again, will trigger another retry
runProcessorOnce(splitTextByLine);
waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1);
assertEquals(2, getRetryCount(generateFlowFileAndSplitTextByLineConnection));
//Running SplitTextByLine processor again should finish retry and transfer flow file
runProcessorOnce(splitTextByLine);
waitForQueueCount(splitTextByLineAndCountEventsSplitsConnection.getId(), 5);
assertEquals(0, getRetryCount(splitTextByLineAndCountEventsSplitsConnection));
assertEquals(0, getConnectionQueueSize(generateFlowFileAndSplitTextByLineConnection.getId()));
assertEquals(0, getConnectionQueueSize(splitTextByLineAndCountEventsFailureConnection.getId()));
assertEquals(1, getConnectionQueueSize(splitTextByLineAndCountEventsOriginalConnection.getId()));
}
private int getRetryCount(final ConnectionEntity connection) throws NiFiClientException, IOException {
return getClientUtil().getQueueFlowFile(connection.getId(), 0).getFlowFile().getAttributes()
.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("retryCount."))
.map(Map.Entry::getValue)
.findFirst()
.map(Integer::parseInt)
.orElse(0);
}
private void enableRetries(final ProcessorEntity processorEntity, final Set<String> relationships) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setRetryCount(RETRY_COUNT);
config.setMaxBackoffPeriod("1 ms");
config.setBackoffMechanism(BackoffMechanism.PENALIZE_FLOWFILE.name());
config.setRetriedRelationships(relationships);
config.setPenaltyDuration("1 ms");
getClientUtil().updateProcessorConfig(processorEntity, config);
}
}