diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedProcessor.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedProcessor.java index 51c42deb95..67d5b1ab77 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedProcessor.java +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedProcessor.java @@ -44,6 +44,11 @@ public class VersionedProcessor extends VersionedComponent private Set autoTerminatedRelationships; private ScheduledState scheduledState; + private Integer retryCount; + private Set retriedRelationships; + private String backoffMechanism; + private String maxBackoffPeriod; + @ApiModelProperty("The frequency with which to schedule the processor. The format of the value will depend on th value of schedulingStrategy.") public String getSchedulingPeriod() { return schedulingPeriod; @@ -204,4 +209,48 @@ public class VersionedProcessor extends VersionedComponent return ComponentType.PROCESSOR; } + @ApiModelProperty( + value = "Overall number of retries." + ) + public Integer getRetryCount() { + return retryCount; + } + + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @ApiModelProperty( + value = "All the relationships should be retried." + ) + public Set getRetriedRelationships() { + return retriedRelationships; + } + + public void setRetriedRelationships(Set retriedRelationships) { + this.retriedRelationships = retriedRelationships; + } + + @ApiModelProperty( + value = "Determines whether the FlowFile should be penalized or the processor should be yielded between retries.", + allowableValues = "PENALIZE_FLOWFILE, YIELD_PROCESSOR" + ) + public String getBackoffMechanism() { + return backoffMechanism; + } + + public void setBackoffMechanism(String backoffMechanism) { + this.backoffMechanism = backoffMechanism; + } + + @ApiModelProperty( + value = "Maximum amount of time to be waited during a retry period." + ) + public String getMaxBackoffPeriod() { + return maxBackoffPeriod; + } + + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + this.maxBackoffPeriod = maxBackoffPeriod; + } } diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index 9784384b8a..9db1289a07 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -163,4 +163,14 @@ public interface ProcessContext extends PropertyContext, ClusterContext { * @return the configured name of this processor */ String getName(); + + /** + * @return the configured relationships to be retried of this processor + */ + boolean isRelationshipRetried(Relationship relationship); + + /** + * @return the actual value of the retry counter of this processor + */ + int getRetryCount(); } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 5c8a8e8bdc..daa8ceee52 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -579,4 +579,14 @@ public class MockProcessContext extends MockControllerServiceLookup implements P public boolean isConnectedToCluster() { return isConnected; } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java index 780e8acb70..d6b8280f04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java @@ -51,6 +51,12 @@ public class ProcessorConfigDTO { private Map defaultConcurrentTasks; private Map defaultSchedulingPeriod; + //retry configurations + private Integer retryCount; + private Set retriedRelationships; + private String backoffMechanism; + private String maxBackoffPeriod; + public ProcessorConfigDTO() { } @@ -308,4 +314,48 @@ public class ProcessorConfigDTO { this.defaultSchedulingPeriod = defaultSchedulingPeriod; } + @ApiModelProperty( + value = "Overall number of retries." + ) + public Integer getRetryCount() { + return retryCount; + } + + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @ApiModelProperty( + value = "All the relationships should be retried." + ) + public Set getRetriedRelationships() { + return retriedRelationships; + } + + public void setRetriedRelationships(Set retriedRelationships) { + this.retriedRelationships = retriedRelationships; + } + + @ApiModelProperty(accessMode = ApiModelProperty.AccessMode.READ_ONLY, + value = "Determines whether the FlowFile should be penalized or the processor should be yielded between retries.", + allowableValues = "PENALIZE_FLOWFILE, YIELD_PROCESSOR" + ) + public String getBackoffMechanism() { + return backoffMechanism; + } + + public void setBackoffMechanism(String backoffMechanism) { + this.backoffMechanism = backoffMechanism; + } + + @ApiModelProperty( + value = "Maximum amount of time to be waited during a retry period." + ) + public String getMaxBackoffPeriod() { + return maxBackoffPeriod; + } + + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + this.maxBackoffPeriod = maxBackoffPeriod; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RelationshipDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RelationshipDTO.java index ea487d2c04..d62383b20e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RelationshipDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RelationshipDTO.java @@ -29,6 +29,7 @@ public class RelationshipDTO { private String name; private String description; private Boolean autoTerminate; + private Boolean retry; /** * @return the relationship name @@ -71,4 +72,18 @@ public class RelationshipDTO { public void setAutoTerminate(Boolean autoTerminate) { this.autoTerminate = autoTerminate; } + + /** + * @return true if relationship is retry;false otherwise + */ + @ApiModelProperty( + value = "Whether or not flowfiles sent to this relationship should retry." + ) + public Boolean isRetry() { + return retry; + } + + public void setRetry(Boolean retry) { + this.retry = retry; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index b0a3229069..6e480d0daa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -127,6 +127,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; public static final String DEFAULT_YIELD_PERIOD = "1 sec"; public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; + private static final String DEFAULT_MAX_BACKOFF_PERIOD = "10 mins"; private final AtomicReference processGroup; private final AtomicReference processorRef; private final AtomicReference identifier; @@ -157,6 +158,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final int hashCode; private volatile boolean hasActiveThreads = false; + private volatile int retryCount; + private volatile Set retriedRelationships; + private volatile BackoffMechanism backoffMechanism; + private volatile String maxBackoffPeriod; + public StandardProcessorNode(final LoggableComponent processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final ComponentVariableRegistry variableRegistry, @@ -202,6 +208,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable executionNode = isExecutionNodeRestricted() ? ExecutionNode.PRIMARY : ExecutionNode.ALL; this.hashCode = new HashCodeBuilder(7, 67).append(identifier).toHashCode(); + retryCount = 10; + retriedRelationships = new HashSet<>(); + backoffMechanism = BackoffMechanism.PENALIZE_FLOWFILE; + maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD; + try { if (processorDetails.getProcClass().isAnnotationPresent(DefaultSchedule.class)) { DefaultSchedule dsc = processorDetails.getProcClass().getAnnotation(DefaultSchedule.class); @@ -1863,6 +1874,73 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return desiredState; } + @Override + public int getRetryCount() { + return retryCount; + } + + @Override + public void setRetryCount(Integer retryCount) { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.retryCount = (retryCount == null) ? 0 : retryCount; + } + + @Override + public Set getRetriedRelationships() { + return retriedRelationships; + } + + @Override + public void setRetriedRelationships(Set retriedRelationships) { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships); + } + + @Override + public boolean isRelationshipRetried(final Relationship relationship) { + if (relationship == null) { + return false; + } else { + return this.retriedRelationships.contains(relationship.getName()); + } + } + + @Override + public BackoffMechanism getBackoffMechanism() { + return backoffMechanism; + } + + @Override + public void setBackoffMechanism(BackoffMechanism backoffMechanism) { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism; + } + + @Override + public String getMaxBackoffPeriod() { + return maxBackoffPeriod; + } + + @Override + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + if (maxBackoffPeriod == null) { + maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD; + } + final long backoffNanos = FormatUtils.getTimeDuration(maxBackoffPeriod, TimeUnit.NANOSECONDS); + if (backoffNanos < 0) { + throw new IllegalArgumentException("Max Backoff Period must be positive"); + } + this.maxBackoffPeriod = maxBackoffPeriod; + } private void monitorAsyncTask(final Future taskFuture, final Future monitoringFuture, final long completionTimestamp) { if (taskFuture.isDone()) { monitoringFuture.cancel(false); // stop scheduling this task diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 4e7b286434..2e36f7483d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.lifecycle.TaskTermination; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -63,6 +64,7 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.NonFlushableOutputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,7 +142,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn private Map immediateCounters; private final Set removedFlowFiles = new HashSet<>(); - private final Set createdFlowFiles = new HashSet<>(); + private final Set createdFlowFiles = new HashSet<>(); // UUID of any FlowFile that was created in this session + private final Set createdFlowFilesWithoutLineage = new HashSet<>(); // UUID of any FlowFile that was created without a parent. This is a subset of createdFlowFiles. private final InternalProvenanceReporter provenanceReporter; @@ -174,6 +177,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn private StateMap localState; private StateMap clusterState; + private final String retryAttribute; + private final FlowFileLinkage flowFileLinkage = new FlowFileLinkage(); public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) { this.context = context; @@ -185,6 +190,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn this.claimCache = context.createContentClaimWriteCache(); LOG.trace("Session {} created for {}", this, connectableDescription); processingStartTime = System.nanoTime(); + retryAttribute = "retryCount." + context.getConnectable().getIdentifier(); } private void verifyTaskActive() { @@ -289,7 +295,28 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary final Map toAdd = new HashMap<>(); + + final Connectable connectable = context.getConnectable(); + final long maxBackoffMillis = Math.round(FormatUtils.getPreciseTimeDuration(connectable.getMaxBackoffPeriod(), TimeUnit.MILLISECONDS)); + + // Determine which FlowFiles need to be retried + final Set retryIds = new HashSet<>(); for (final StandardRepositoryRecord record : records.values()) { + if (isRetry(record)) { + final long flowFileId = record.getCurrent().getId(); + retryIds.add(flowFileId); + + final Collection linkedIds = flowFileLinkage.getLinkedIds(flowFileId); + retryIds.addAll(linkedIds); + } + } + + for (final StandardRepositoryRecord record : records.values()) { + // Check if this Record should be retried. If so, perform the necessary actions to retry the Record and then continue on to the next record. + if (retryIds.contains(record.getCurrent().getId())) { + retry(record, maxBackoffMillis); + } + if (record.isMarkedForDelete()) { continue; } @@ -318,13 +345,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } } else { final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element + final FlowFileRecord currRec = record.getCurrent(); + final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); + builder.removeAttributes(retryAttribute); + record.setDestination(finalDestination.getFlowFileQueue()); + record.setWorking(builder.build(), false); incrementConnectionInputCounts(finalDestination, record); for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed incrementConnectionInputCounts(destination, record); - final FlowFileRecord currRec = record.getCurrent(); - final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); builder.id(context.getNextFlowFileSequence()); final String newUuid = UUID.randomUUID().toString(); @@ -338,7 +368,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn if (claim != null) { context.getContentRepository().incrementClaimaintCount(claim); } - newRecord.setWorking(clone, Collections. emptyMap(), false); + newRecord.setWorking(clone, Collections.emptyMap(), false); newRecord.setDestination(destination.getFlowFileQueue()); newRecord.setTransferRelationship(record.getTransferRelationship()); @@ -356,6 +386,110 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections); } + private boolean isRetry(final StandardRepositoryRecord record) { + final Relationship relationship = record.getTransferRelationship(); + if (relationship == null) { + return false; + } + + final Connectable connectable = context.getConnectable(); + if (!connectable.isRelationshipRetried(relationship)) { + return false; + } + + // If the FlowFile was created in this session and has no lineage (i.e., it was created by a source component), + // we do not want to retry the FlowFile, as there is no way to roll back. + final String uuid = record.getCurrent().getAttribute(CoreAttributes.UUID.key()); + if (createdFlowFilesWithoutLineage.contains(uuid)) { + return false; + } + + final int retryCount = getRetries(record.getCurrent()); + return retryCount < connectable.getRetryCount(); + } + + private void retry(final StandardRepositoryRecord record, final long maxBackoffMillis) { + LOG.debug("Updating state to retry {}", record.getCurrent()); + + final Connectable connectable = context.getConnectable(); + final int currentRetries = getRetries(record.getCurrent()); + + // Account for any statistics that have been added to for FlowFiles/Bytes In/Out + final Relationship relationship = record.getTransferRelationship(); + final int numDestinations = context.getConnections(relationship).size(); + final int multiplier = Math.max(1, numDestinations); + final boolean autoTerminated = connectable.isAutoTerminated(relationship); + if (!autoTerminated) { + flowFilesOut-= multiplier; + contentSizeOut -= record.getCurrent().getSize() * multiplier; + } + + final FlowFileRecord original = record.getOriginal(); + if (original != null) { + flowFilesIn--; + contentSizeIn -= original.getSize(); + } + + // If any content has been created but is no longer being used (i.e., the FlowFile was written to but is now being reverted back to its + // previous content), then remove the temporary content. + removeTemporaryClaim(record); + + // Adjust for any state that has been updated for the Record that is no longer relevant. + final String uuid = record.getCurrent().getAttribute(CoreAttributes.UUID.key()); + final FlowFileRecord updatedFlowFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getOriginal()) + .addAttribute(retryAttribute, String.valueOf(currentRetries + 1)) + .build(); + + if (original == null) { + record.markForDelete(); + } else { + record.setTransferRelationship(Relationship.SELF); + } + + record.setWorking(updatedFlowFile, false); + + // Remove any Provenance Events that have been generated for this Record + provenanceReporter.removeEventsForFlowFile(uuid); + forkEventBuilders.remove(record.getCurrent()); + createdFlowFiles.remove(uuid); + createdFlowFilesWithoutLineage.remove(uuid); + removedFlowFiles.remove(uuid); + + // Penalize the FlowFile or yield the connectable, according to the component configuration. + final BackoffMechanism backoffMechanism = connectable.getBackoffMechanism(); + if (backoffMechanism == BackoffMechanism.PENALIZE_FLOWFILE) { + if (!record.isMarkedForDelete()) { + final long backoffTime = calculateBackoffTime(currentRetries, maxBackoffMillis, connectable.getPenalizationPeriod(TimeUnit.MILLISECONDS)); + penalize(record.getCurrent(), backoffTime, TimeUnit.MILLISECONDS); + } + } else { + final long backoffTime = calculateBackoffTime(currentRetries, maxBackoffMillis, connectable.getYieldPeriod(TimeUnit.MILLISECONDS)); + connectable.yield(backoffTime, TimeUnit.MILLISECONDS); + } + } + + private int getRetries(final FlowFile flowFile) { + if (flowFile == null) { + return 0; + } + + final String attributeValue = flowFile.getAttribute(retryAttribute); + if (attributeValue == null) { + return 0; + } + + try { + return Integer.parseInt(attributeValue); + } catch (final Exception e) { + return 0; + } + } + + private long calculateBackoffTime(final int retryCount, final long maxBackoffPeriod, final long baseBackoffTime) { + return (long) Math.min(maxBackoffPeriod, Math.pow(2, retryCount) * baseBackoffTime); + } + @Override public synchronized void commit() { commit(false); @@ -798,11 +932,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } } - if (!eventAdded && !repoRecord.getUpdatedAttributes().isEmpty()) { + if (!eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() && curFlowFile.getAttribute(retryAttribute) == null) { // We generate an ATTRIBUTES_MODIFIED event only if no other event has been // created for the FlowFile. We do this because all events contain both the // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED // event is redundant if another already exists. + // We don't generate ATTRIBUTES_MODIFIED event for retry. if (!eventTypesPerFlowFileId.containsKey(flowFileId)) { recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build()); addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED); @@ -1261,7 +1396,10 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn bytesWritten = 0L; connectionCounts.clear(); createdFlowFiles.clear(); + createdFlowFilesWithoutLineage.clear(); removedFlowFiles.clear(); + flowFileLinkage.clear(); + if (countersOnCommit != null) { countersOnCommit.clear(); } @@ -1406,8 +1544,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn for (final FlowFile flowFile : flowFiles) { final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile; - final StandardRepositoryRecord repoRecord = this.records.remove(flowFile.getId()); - newOwner.records.put(flowFileRecord.getId(), repoRecord); + final long flowFileId = flowFile.getId(); + final StandardRepositoryRecord repoRecord = this.records.remove(flowFileId); + newOwner.records.put(flowFileId, repoRecord); + + final Collection linkedIds = this.flowFileLinkage.remove(flowFileId); + if (linkedIds != null) { + linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, linkedId)); + } // Adjust the counts for Connections for each FlowFile that was pulled from a Connection. // We do not have to worry about accounting for 'input counts' on connections because those @@ -1429,9 +1573,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn newOwner.contentSizeIn += flowFile.getSize(); } - final String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key()); - if (removedFlowFiles.remove(flowFileId)) { - newOwner.removedFlowFiles.add(flowFileId); + final String flowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + if (removedFlowFiles.remove(flowFileUuid)) { + newOwner.removedFlowFiles.add(flowFileUuid); newOwner.removedCount++; newOwner.removedBytes += flowFile.getSize(); @@ -1439,8 +1583,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn removedBytes -= flowFile.getSize(); } - if (createdFlowFiles.remove(flowFileId)) { - newOwner.createdFlowFiles.add(flowFileId); + if (createdFlowFiles.remove(flowFileUuid)) { + newOwner.createdFlowFiles.add(flowFileUuid); } if (repoRecord.getTransferRelationship() != null) { @@ -1834,7 +1978,10 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, attrs, false); records.put(fFile.getId(), record); - createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); + + createdFlowFiles.add(uuid); + createdFlowFilesWithoutLineage.add(uuid); + return fFile; } @@ -1876,6 +2023,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); registerForkEvent(parent, fFile); + flowFileLinkage.addLink(parent.getId(), fFile.getId()); return fFile; } @@ -1925,6 +2073,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); registerJoinEvent(fFile, parents); + + final long flowFileId = fFile.getId(); + for (final FlowFile parent : parents) { + flowFileLinkage.addLink(flowFileId, parent.getId()); + } + return fFile; } @@ -1970,6 +2124,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn registerForkEvent(example, clone); } + flowFileLinkage.addLink(example.getId(), clone.getId()); + return clone; } @@ -2006,10 +2162,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn @Override public FlowFile penalize(FlowFile flowFile) { verifyTaskActive(); + flowFile = validateRecordState(flowFile, false); - flowFile = validateRecordState(flowFile); + return penalize(flowFile, context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + + public FlowFile penalize(FlowFile flowFile, final long period, final TimeUnit timeUnit) { + flowFile = getRecord(flowFile).getCurrent(); final StandardRepositoryRecord record = getRecord(flowFile); - final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); + final long penalizeMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit); + final long expirationEpochMillis = System.currentTimeMillis() + penalizeMillis; final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build(); record.setWorking(newFile, false); return newFile; @@ -2250,6 +2412,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn removedBytes += flowFile.getSize(); provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key())); } + + flowFileLinkage.remove(flowFile.getId()); } @Override @@ -2272,6 +2436,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn removedBytes += flowFile.getSize(); provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key())); } + + flowFileLinkage.remove(flowFile.getId()); } } @@ -3821,4 +3987,47 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn return removedBytes; } } + + private static class FlowFileLinkage { + private final Map> linkedIds = new HashMap<>(); + + public void addLink(final long id, final long other) { + if (id == other) { + return; + } + + linkedIds.computeIfAbsent(id, key -> new ArrayList<>()).add(other); + linkedIds.computeIfAbsent(other, key -> new ArrayList<>()).add(id); + } + + public Collection getLinkedIds(final long id) { + final List linked = linkedIds.get(id); + final Set allLinked = new HashSet<>(); + if (linked != null) { + allLinked.addAll(linked); + + for (final Long linkedId : linked) { + final List onceRemoved = linkedIds.get(linkedId); + allLinked.addAll(onceRemoved); + } + } + return allLinked; + } + + public Collection remove(final long id) { + final List linked = linkedIds.remove(id); + + if (linked != null) { + for (final Long otherId : linked) { + linkedIds.get(otherId).remove(id); + } + } + + return linked; + } + + public void clear() { + linkedIds.clear(); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 7b506ebc6e..179aac0d53 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -73,6 +73,11 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter { events.remove(event); } + @Override + public void removeEventsForFlowFile(final String uuid) { + events.removeIf(event -> event.getFlowFileUuid().equals(uuid)); + } + @Override public void clear() { events.clear(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/scheduling/ConnectableProcessContext.java index 3d24128d47..c2a0879b35 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/scheduling/ConnectableProcessContext.java @@ -287,4 +287,15 @@ public class ConnectableProcessContext implements ProcessContext { public boolean isConnectedToCluster() { throw new UnsupportedOperationException(); } + + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } + + @Override + public int getRetryCount() { + return 0; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java index e768fda4bc..39004a50b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java @@ -27,6 +27,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.PropertyConfiguration; @@ -1586,6 +1587,19 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize processor.setYieldPeriod(proposed.getYieldDuration()); processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + processor.setMaxBackoffPeriod(proposed.getMaxBackoffPeriod()); + processor.setRetriedRelationships(proposed.getRetriedRelationships()); + + if (proposed.getRetryCount() != null) { + processor.setRetryCount(proposed.getRetryCount()); + } else { + processor.setRetryCount(10); + } + + if (proposed.getBackoffMechanism() != null) { + processor.setBackoffMechanism(BackoffMechanism.valueOf(proposed.getBackoffMechanism())); + } + final ScheduledState procState = processor.getScheduledState(); final ProcessGroup group = processor.getProcessGroup(); switch (proposed.getScheduledState()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index a08ac26976..1dcd79bac1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -347,4 +347,14 @@ public class StandardProcessContext implements ProcessContext, ControllerService public boolean isConnectedToCluster() { return nodeTypeProvider.isConnected(); } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return procNode.isRelationshipRetried(relationship); + } + + @Override + public int getRetryCount() { + return procNode.getRetryCount(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 886916ee87..c883272f43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -700,6 +700,10 @@ public class NiFiRegistryFlowMapper { processor.setStyle(procNode.getStyle()); processor.setYieldDuration(procNode.getYieldPeriod()); processor.setScheduledState(flowMappingOptions.getStateLookup().getState(procNode)); + processor.setRetryCount(procNode.getRetryCount()); + processor.setRetriedRelationships(procNode.getRetriedRelationships()); + processor.setBackoffMechanism(procNode.getBackoffMechanism().name()); + processor.setMaxBackoffPeriod(procNode.getMaxBackoffPeriod()); return processor; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/mapping/TestNiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/mapping/TestNiFiRegistryFlowMapper.java index dcc33cc82e..b168dfcce6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/mapping/TestNiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/mapping/TestNiFiRegistryFlowMapper.java @@ -22,6 +22,7 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.PropertyConfiguration; @@ -215,6 +216,7 @@ public class TestNiFiRegistryFlowMapper { when(procNode.getPosition()).thenReturn(new Position(0D, 0D)); when(procNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN); when(procNode.getPhysicalScheduledState()).thenReturn(ScheduledState.STOPPED); + when(procNode.getBackoffMechanism()).thenReturn(BackoffMechanism.YIELD_PROCESSOR); return procNode; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java index 26fa0f8308..6e144ca7dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java @@ -19,6 +19,7 @@ package org.apache.nifi.connectable; import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.VersionedComponent; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.Triggerable; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessSession; @@ -291,4 +292,22 @@ public interface Connectable extends Triggerable, ComponentAuthorizable, Positio * @return the type of the component. I.e., the class name of the implementation */ String getComponentType(); + + int getRetryCount(); + + void setRetryCount(Integer retryCount); + + Set getRetriedRelationships(); + + void setRetriedRelationships(Set retriedRelationships); + + boolean isRelationshipRetried(Relationship relationship); + + BackoffMechanism getBackoffMechanism(); + + void setBackoffMechanism(BackoffMechanism backoffMechanism); + + String getMaxBackoffPeriod(); + + void setMaxBackoffPeriod(String maxBackoffPeriod); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index 581c14c605..dad2b67eb6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -62,6 +62,7 @@ public abstract class AbstractPort implements Port { .build(); private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final String DEFAULT_MAX_BACKOFF_PERIOD = "10 mins"; private final List relationships; @@ -659,4 +660,45 @@ public abstract class AbstractPort implements Port { } } } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + } + + @Override + public Set getRetriedRelationships() { + return Collections.EMPTY_SET; + } + + @Override + public void setRetriedRelationships(Set retriedRelationships) { + } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } + + @Override + public BackoffMechanism getBackoffMechanism() { + return BackoffMechanism.PENALIZE_FLOWFILE; + } + + @Override + public void setBackoffMechanism(BackoffMechanism backoffMechanism) { + } + + @Override + public String getMaxBackoffPeriod() { + return DEFAULT_MAX_BACKOFF_PERIOD; + } + + @Override + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/BackoffMechanism.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/BackoffMechanism.java new file mode 100644 index 0000000000..4be3c9d379 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/BackoffMechanism.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +public enum BackoffMechanism { + PENALIZE_FLOWFILE, YIELD_PROCESSOR +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index c25ac0d981..026ad8eb45 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -58,6 +58,7 @@ import static java.util.Objects.requireNonNull; public class StandardFunnel implements Funnel { public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private static final String DEFAULT_MAX_BACKOFF_PERIOD = "10 mins"; private final String identifier; private final Set outgoingConnections; @@ -577,6 +578,47 @@ public class StandardFunnel implements Funnel { return "Funnel"; } + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + } + + @Override + public Set getRetriedRelationships() { + return Collections.EMPTY_SET; + } + + @Override + public void setRetriedRelationships(Set retriedRelationships) { + } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } + + @Override + public BackoffMechanism getBackoffMechanism() { + return BackoffMechanism.PENALIZE_FLOWFILE; + } + + @Override + public void setBackoffMechanism(BackoffMechanism backoffMechanism) { + } + + @Override + public String getMaxBackoffPeriod() { + return DEFAULT_MAX_BACKOFF_PERIOD; + } + + @Override + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + } + @Override public Optional getVersionedComponentId() { return Optional.ofNullable(versionedComponentId.get()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java index 9bb45bcbfd..6570865ae2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java @@ -35,6 +35,8 @@ public interface InternalProvenanceReporter extends ProvenanceReporter { void remove(ProvenanceEventRecord event); + void removeEventsForFlowFile(String uuid); + void clear(); void migrate(InternalProvenanceReporter newOwner, Collection flowFileIds); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java index 5bff311d1e..566b8c6129 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java @@ -371,6 +371,12 @@ public class StandardFlowSnippet implements FlowSnippet { procNode.setPenalizationPeriod(config.getPenaltyDuration()); procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); procNode.setAnnotationData(config.getAnnotationData()); + procNode.setRetryCount(config.getRetryCount()); + procNode.setRetriedRelationships(config.getRetriedRelationships()); + if (config.getBackoffMechanism() != null) { + procNode.setBackoffMechanism(BackoffMechanism.valueOf(config.getBackoffMechanism())); + } + procNode.setMaxBackoffPeriod(config.getMaxBackoffPeriod()); procNode.setStyle(processorDTO.getStyle()); if (config.getRunDurationMillis() != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java index ab15df7c9f..1d2c0e3e36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java @@ -1232,6 +1232,13 @@ public class XmlFlowSynchronizer implements FlowSynchronizer { procNode.setPenalizationPeriod(config.getPenaltyDuration()); procNode.setYieldPeriod(config.getYieldDuration()); procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); + procNode.setRetryCount(config.getRetryCount()); + procNode.setRetriedRelationships(config.getRetriedRelationships()); + + if (config.getBackoffMechanism() != null) { + procNode.setBackoffMechanism(BackoffMechanism.valueOf(config.getBackoffMechanism())); + } + procNode.setMaxBackoffPeriod(config.getMaxBackoffPeriod()); updateNonFingerprintedProcessorSettings(procNode, processorDTO); if (config.getSchedulingStrategy() != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index e8594b8de6..1fb4ded672 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -499,6 +499,21 @@ public class FlowFromDOMFactory { configDto.setYieldDuration(getString(element, "yieldPeriod")); configDto.setBulletinLevel(getString(element, "bulletinLevel")); configDto.setLossTolerant(getBoolean(element, "lossTolerant")); + if (getString(element, "retryCount") != null) { + configDto.setRetryCount(getInt(element, "retryCount")); + } else { + configDto.setRetryCount(10); + } + configDto.setMaxBackoffPeriod(getString(element, "maxBackoffPeriod")); + configDto.setBackoffMechanism(getString(element, "backoffMechanism")); + + final Set retriedRelationships = new HashSet<>(); + final List retriedRelationshipList = getChildrenByTagName(element, "retriedRelationship"); + for (final Element retriedRelationship : retriedRelationshipList) { + retriedRelationships.add(retriedRelationship.getTextContent()); + } + configDto.setRetriedRelationships(retriedRelationships); + final ScheduledState scheduledState = getScheduledState(element); dto.setState(scheduledState.toString()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index 3558eac639..0e4961bf37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -515,6 +515,13 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); addTextElement(element, "executionNode", processor.getExecutionNode().name()); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); + addTextElement(element, "retryCount", processor.getRetryCount()); + addTextElement(element, "backoffMechanism", processor.getBackoffMechanism().name()); + addTextElement(element, "maxBackoffPeriod", processor.getMaxBackoffPeriod()); + + for (final String relationship : processor.getRetriedRelationships()) { + addTextElement(element, "retriedRelationship", relationship); + } addConfiguration(element, processor.getRawPropertyValues(), processor.getAnnotationData(), encryptor); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index ffaf032e03..abee7c26ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -481,6 +481,12 @@ public class FingerprintFactory { appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "executionNode")); // run duration nanos appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "runDurationNanos")); + // retry count + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "retryCount")); + // backoff mechanism + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "backoffMechanism")); + // max backoff period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "maxBackoffPeriod")); // get the temp instance of the Processor so that we know the default property values final BundleCoordinate coordinate = getCoordinate(className, bundle); @@ -504,6 +510,12 @@ public class FingerprintFactory { builder.append(autoTerminateElem.getTextContent()); } + final NodeList retriedRelationshipsElems = DomUtils.getChildNodesByTagName(processorElem, "retriedRelationships"); + final List sortedRetriedRelationshipsElems = sortElements(retriedRelationshipsElems, getElementTextComparator()); + for (final Element retriedRelationshipElem : sortedRetriedRelationshipsElems) { + builder.append(retriedRelationshipElem.getTextContent()); + } + return builder; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 03324c0660..3605b4f7ba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -146,6 +146,15 @@ + + + + + + + + + @@ -467,6 +476,13 @@ + + + + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 54f5cff7e1..e201e772f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -995,6 +995,10 @@ public class TestFlowController { configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name()); configDTO.setExecutionNode(processorNode.getExecutionNode().name()); configDTO.setAnnotationData(processorNode.getAnnotationData()); + configDTO.setRetryCount(processorNode.getRetryCount()); + configDTO.setRetriedRelationships(processorNode.getRetriedRelationships()); + configDTO.setBackoffMechanism(processorNode.getBackoffMechanism().name()); + configDTO.setMaxBackoffPeriod(processorNode.getMaxBackoffPeriod()); processorDTO.setConfig(configDTO); @@ -1049,6 +1053,10 @@ public class TestFlowController { configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name()); configDTO.setExecutionNode(processorNode.getExecutionNode().name()); configDTO.setAnnotationData(processorNode.getAnnotationData()); + configDTO.setRetryCount(processorNode.getRetryCount()); + configDTO.setRetriedRelationships(processorNode.getRetriedRelationships()); + configDTO.setBackoffMechanism(processorNode.getBackoffMechanism().name()); + configDTO.setMaxBackoffPeriod(processorNode.getMaxBackoffPeriod()); processorDTO.setConfig(configDTO); @@ -1105,6 +1113,10 @@ public class TestFlowController { configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name()); configDTO.setExecutionNode(processorNode.getExecutionNode().name()); configDTO.setAnnotationData(processorNode.getAnnotationData()); + configDTO.setRetryCount(processorNode.getRetryCount()); + configDTO.setRetriedRelationships(processorNode.getRetriedRelationships()); + configDTO.setBackoffMechanism(processorNode.getBackoffMechanism().name()); + configDTO.setMaxBackoffPeriod(processorNode.getMaxBackoffPeriod()); processorDTO.setConfig(configDTO); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index 0867246fa2..d73b081f63 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -21,8 +21,11 @@ import org.apache.nifi.components.state.StateMap; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.Counter; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.NopConnectionEventListener; import org.apache.nifi.controller.queue.PollStrategy; @@ -40,6 +43,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.FlowFileHandlingException; @@ -91,6 +95,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -102,11 +107,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -184,6 +193,8 @@ public class StandardProcessSessionIT { when(connectable.getIdentifier()).thenReturn("connectable-1"); when(connectable.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT); when(connectable.getComponentType()).thenReturn("Unit Test Component"); + when(connectable.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE); + when(connectable.getMaxBackoffPeriod()).thenReturn("1 sec"); Mockito.doAnswer(new Answer>() { @Override @@ -2801,7 +2812,335 @@ public class StandardProcessSessionIT { assertEquals(0, contentRepo.getClaimantCount(getContentClaim(flowFile))); } + @Test + public void testWhenInRetryAttributeIsAdded() { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + StandardProcessSession session = createSessionForRetry(processor); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + FlowFile ff1 = session.get(); + assertNotNull(ff1); + session.transfer(flowFileRecord, relationship); + session.commit(); + + FlowFile ff2 = session.get(); + assertNotNull(ff2); + assertEquals("1", ff2.getAttribute("retryCount." + connectable.getIdentifier())); + } + + @Test + public void testWhenRetryCompletedAttributeIsRemoved() { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + final StandardProcessSession session = createSessionForRetry(processor); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + assertNotNull(ff1); + session.transfer(flowFileRecord, relationship); + session.commit(); + + final FlowFile ff2 = session.get(); + assertNotNull(ff2); + assertEquals("1", ff2.getAttribute("retryCount." + processor.getIdentifier())); + session.transfer(flowFileRecord, relationship); + session.commit(); + + final FlowFile ff3 = session.get(); + assertNotNull(ff3); + assertNull(ff3.getAttribute("retryCount." + processor.getIdentifier())); + } + + @Test + public void testRetryParentFlowFileRemovesChildren() throws IOException { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.PENALIZE_FLOWFILE, "15 ms", 10000L); + final StandardProcessSession session = createSessionForRetry(processor); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .id(500L) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationshipA = new Relationship.Builder().name("A").build(); + final Relationship relationshipB = new Relationship.Builder().name("B").build(); + + final FlowFile original = session.get(); + assertNotNull(original); + + final List contentClaims = new ArrayList<>(); + for (int i=0; i < 3; i++) { + FlowFile child = session.create(original); + final byte[] contents = String.valueOf(i).getBytes(); + child = session.write(child, out -> out.write(contents)); + + final FlowFileRecord childRecord = (FlowFileRecord) child; + contentClaims.add(childRecord.getContentClaim()); + + session.transfer(child, relationshipB); + } + + session.transfer(original, relationshipA); + session.commit(); + + assertEquals(1, flowFileQueue.size().getObjectCount()); + final List provEvents = provenanceRepo.getEvents(0, 1000); + assertEquals(0, provEvents.size()); + + assertNull(flowFileRecord.getContentClaim()); + for (final ContentClaim claim : contentClaims) { + assertEquals(0, contentRepo.getClaimantCount(claim)); + } + } + + @Test + public void testWhenFlowFilePenalizeBackoffMechanismConfiguredFlowFileIsPenalized() throws IOException { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.PENALIZE_FLOWFILE, "15 ms", 10000L); + final StandardProcessSession session = createSessionForRetry(processor); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + assertNotNull(ff1); + session.transfer(flowFileRecord, relationship); + session.commit(); + + assertTrue(flowFileQueue.getFlowFile(ff1.getAttribute(CoreAttributes.UUID.key())).isPenalized()); + } + + @Test + public void testWhenYieldingBackoffMechanismConfiguredProcessorIsYielded() { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + final StandardProcessSession session = createSessionForRetry(processor); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + assertNotNull(ff1); + session.transfer(flowFileRecord, relationship); + session.commit(); + + verify(processor).yield(anyLong(), eq(TimeUnit.MILLISECONDS)); + + } + + @Test + public void testWhenInRetryProcessorStatisticsAdjusted() throws IOException { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + final StandardProcessSession session = createSessionForRetry(processor); + + final ContentClaim contentClaim = contentRepo.create("original".getBytes()); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(8L) + .contentClaim(contentClaim) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + assertNotNull(ff1); + session.transfer(flowFileRecord, relationship); + session.commit(); + + final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L); + + final int flowFilesIn = report.getReportEntry("connectable-1").getFlowFilesIn(); + assertEquals(0, flowFilesIn); + final int flowFilesOut = report.getReportEntry("connectable-1").getFlowFilesOut(); + assertEquals(0, flowFilesOut); + final long contentSizeIn = report.getReportEntry("connectable-1").getContentSizeIn(); + assertEquals(0, contentSizeIn); + final long contentSizeOut = report.getReportEntry("connectable-1").getContentSizeOut(); + assertEquals(0, contentSizeOut); + } + + @Test + public void testWhenInRetryEventsAreCleared() throws IOException { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + final StandardProcessSession session = createSessionForRetry(processor); + + final ContentClaim contentClaim = contentRepo.create("original".getBytes()); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(8L) + .contentClaim(contentClaim) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + assertNotNull(ff1); + final FlowFile ff2 = session.create(ff1); + session.transfer(ff1, relationship); + session.transfer(ff2, relationship); + session.commit(); + + final List provEvents = provenanceRepo.getEvents(0L, 1000); + assertEquals(0, provEvents.size()); + + } + + @Test + public void testWhenInRetryClaimsAreCleared() throws IOException { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + final StandardProcessSession session = createSessionForRetry(processor); + + final byte[] originalContent = "original".getBytes(); + final byte[] replacementContent = "modified data".getBytes(); + final ContentClaim originalClaim = contentRepo.create(originalContent); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .contentClaim(originalClaim) + .size(originalContent.length) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + final FlowFile modified = session.write(ff1, (in, out) -> out.write(replacementContent)); + session.transfer(modified, relationship); + session.commit(); + + assertEquals(1, contentRepo.getClaimantCount(originalClaim)); + assertEquals(0, contentRepo.getClaimantCount(((FlowFileRecord) modified).getContentClaim())); + } + + @Test + public void testWhenInRetryContentRestored() throws IOException { + final Connectable processor = createProcessorConnectable(); + configureRetry(processor, 1, BackoffMechanism.YIELD_PROCESSOR, "1 ms", 1L); + final StandardProcessSession session = createSessionForRetry(processor); + + final byte[] originalContent = "original".getBytes(); + final byte[] replacementContent = "modified data".getBytes(); + final ContentClaim originalClaim = contentRepo.create(originalContent); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .contentClaim(originalClaim) + .size(originalContent.length) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + final FlowFile ff1 = session.get(); + final FlowFile modified = session.write(ff1, (in, out) -> out.write(replacementContent)); + session.transfer(modified, relationship); + session.commit(); + + final FlowFile ff2 = session.get(); + assertEquals(originalContent.length, ff2.getSize()); + assertEquals(originalClaim, ((FlowFileRecord) ff2).getContentClaim()); + } + + public void configureRetry(final Connectable connectable, final int retryCount, final BackoffMechanism backoffMechanism, + final String maxBackoffPeriod, final long penalizationPeriod) { + Processor proc = mock(Processor.class); + when(((ProcessorNode) connectable).getProcessor()).thenReturn( proc); + when((connectable).isRelationshipRetried(any())).thenReturn(true); + when((connectable).getRetryCount()).thenReturn(retryCount); + when((connectable).getBackoffMechanism()).thenReturn(backoffMechanism); + when((connectable).getMaxBackoffPeriod()).thenReturn(maxBackoffPeriod); + when((connectable).getRetriedRelationships()).thenReturn(Collections.singleton(FAKE_RELATIONSHIP.getName())); + when(connectable.getPenalizationPeriod(any(TimeUnit.class))).thenReturn(penalizationPeriod); + when(connectable.getYieldPeriod(any(TimeUnit.class))).thenReturn(penalizationPeriod); + } + + public Connectable createProcessorConnectable() { + Connectable connectable = mock(StandardProcessorNode.class); + final Connection connection = createConnection(); + final Connection connectionB = createConnection(); + + final List connList = new ArrayList<>(); + connList.add(connection); + + when(connectable.hasIncomingConnection()).thenReturn(true); + when(connectable.getIncomingConnections()).thenReturn(connList); + + when(connectable.getIdentifier()).thenReturn("connectable-1"); + when(connectable.getConnectableType()).thenReturn(ConnectableType.PROCESSOR); + when(connectable.getComponentType()).thenReturn("Unit Test Component"); + + Mockito.doAnswer((Answer>) invocation -> { + final Object[] arguments = invocation.getArguments(); + final Relationship relationship = (Relationship) arguments[0]; + if (relationship == Relationship.SELF) { + return Collections.emptySet(); + } else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP)) { + return null; + } else if (relationship.getName().equals("B")) { + return Collections.singleton(connectionB); + } else { + return new HashSet<>(connList); + } + }).when(connectable).getConnections(Mockito.any(Relationship.class)); + + when(connectable.getConnections()).thenReturn(new HashSet<>(connList)); + return connectable; + } + + public StandardProcessSession createSessionForRetry(final Connectable connectable) { + + StandardRepositoryContext context = new StandardRepositoryContext(connectable, + new AtomicLong(0L), + contentRepo, + flowFileRepo, + flowFileEventRepository, + counterRepository, + provenanceRepo, + stateManager); + return new StandardProcessSession(context, () -> false); + + } private static class MockFlowFileRepository implements FlowFileRepository { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java index 2e40bfb9a4..a02246acd1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java @@ -28,6 +28,7 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.PropertyConfiguration; @@ -459,6 +460,7 @@ public class NiFiRegistryFlowMapperTest { when(processorNode.getExecutionNode()).thenReturn(ExecutionNode.ALL); when(processorNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN); when(processorNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class)); + when(processorNode.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE); final String rawPropertyValue = "propValue"; final PropertyDescriptor.Builder propertyDescriptorBuilder = diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java index ebcf002a6e..7dcb24a1dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java @@ -129,4 +129,14 @@ public class MockProcessContext implements ProcessContext { public boolean isConnectedToCluster() { return true; } + + @Override + public boolean isRelationshipRetried(Relationship relationshipName) { + return false; + } + + @Override + public int getRetryCount() { + return 0; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java index e502a9ada3..4345b64c4c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java @@ -80,6 +80,10 @@ public class ProcessorAuditor extends NiFiAuditor { private static final String SCHEDULING_STRATEGY = "Scheduling Strategy"; private static final String EXECUTION_NODE = "Execution Node"; private static final String EXTENSION_VERSION = "Extension Version"; + private static final String RETRY_COUNT = "Retry Count"; + private static final String RETRIED_RELATIONSHIPS = "Retried Relationships"; + private static final String BACKOFF_MECHANISM = "Backoff Mechanism"; + private static final String MAX_BACKOFF_PERIOD = "Max Backoff Period"; /** * Audits the creation of processors via createProcessor(). @@ -423,6 +427,24 @@ public class ProcessorAuditor extends NiFiAuditor { if (newConfig.getExecutionNode() != null) { values.put(EXECUTION_NODE, processor.getExecutionNode().name()); } + + if (newConfig.getRetryCount() != null) { + values.put(RETRY_COUNT, String.valueOf(processor.getRetryCount())); + } + + if (newConfig.getRetriedRelationships() != null) { + final List retriedRelationships = new ArrayList<>(processor.getRetriedRelationships()); + Collections.sort(retriedRelationships, Collator.getInstance(Locale.US)); + values.put(RETRIED_RELATIONSHIPS, StringUtils.join(retriedRelationships, ", ")); + } + + if (newConfig.getBackoffMechanism() != null) { + values.put(BACKOFF_MECHANISM, processor.getBackoffMechanism().name()); + } + + if (newConfig.getMaxBackoffPeriod() != null) { + values.put(MAX_BACKOFF_PERIOD, processor.getMaxBackoffPeriod()); + } } return values; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index 75413de86b..49052b4051 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -32,8 +32,10 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.OperationAuthorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.ui.extension.UiExtension; import org.apache.nifi.ui.extension.UiExtensionMapping; +import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UiExtensionType; @@ -835,6 +837,25 @@ public class ProcessorResource extends ApplicationResource { } } + final ProcessorConfigDTO processorConfig = requestProcessorDTO.getConfig(); + if (processorConfig != null) { + if (processorConfig.getRetryCount() != null && processorConfig.getRetryCount() < 0) { + throw new IllegalArgumentException("Retry Count should not be less than zero."); + } + + if (processorConfig.getBackoffMechanism() != null) { + try { + BackoffMechanism.valueOf(processorConfig.getBackoffMechanism()); + } catch (Exception e) { + throw new IllegalArgumentException("Backoff Mechanism " + processorConfig.getBackoffMechanism() + " is invalid."); + } + } + + if (processorConfig.getMaxBackoffPeriod() != null && !FormatUtils.TIME_DURATION_PATTERN.matcher(processorConfig.getMaxBackoffPeriod()).matches()) { + throw new IllegalArgumentException("Max Backoff Period should be specified as time, for example 5 mins"); + } + } + if (isReplicateRequest()) { return replicate(HttpMethod.PUT, requestProcessorEntity); } else if (isDisconnectedFromCluster()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 51a580fa52..0b0d7f8177 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -3155,6 +3155,7 @@ public final class DtoFactory { relationshipDTO.setDescription(rel.getDescription()); relationshipDTO.setName(rel.getName()); relationshipDTO.setAutoTerminate(node.isAutoTerminated(rel)); + relationshipDTO.setRetry(node.isRelationshipRetried(rel)); relationships.add(relationshipDTO); } @@ -3994,6 +3995,11 @@ public final class DtoFactory { dto.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); dto.setExecutionNode(procNode.getExecutionNode().name()); + dto.setBackoffMechanism(procNode.getBackoffMechanism().name()); + dto.setMaxBackoffPeriod(procNode.getMaxBackoffPeriod()); + dto.setRetriedRelationships(procNode.getRetriedRelationships()); + dto.setRetryCount(procNode.getRetryCount()); + return dto; } @@ -4224,6 +4230,10 @@ public final class DtoFactory { copy.setDefaultConcurrentTasks(original.getDefaultConcurrentTasks()); copy.setDefaultSchedulingPeriod(original.getDefaultSchedulingPeriod()); copy.setLossTolerant(original.isLossTolerant()); + copy.setBackoffMechanism(original.getBackoffMechanism()); + copy.setMaxBackoffPeriod(original.getMaxBackoffPeriod()); + copy.setRetryCount(original.getRetryCount()); + copy.setRetriedRelationships(original.getRetriedRelationships()); return copy; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index 3ab11a9bdc..baf49f5ea1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -154,6 +155,10 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { final Long runDurationMillis = config.getRunDurationMillis(); final String bulletinLevel = config.getBulletinLevel(); final Set undefinedRelationshipsToTerminate = config.getAutoTerminatedRelationships(); + final Integer retryCount = config.getRetryCount(); + final Set retriedRelationships = config.getRetriedRelationships(); + final String backoffMechanism = config.getBackoffMechanism(); + final String maxBackoffPeriod = config.getMaxBackoffPeriod(); processor.pauseValidationTrigger(); // ensure that we don't trigger many validations to occur try { @@ -196,6 +201,22 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { processor.setProperties(configProperties); } + if (isNotNull(retryCount)) { + processor.setRetryCount(retryCount); + } + + if (isNotNull(retriedRelationships)) { + processor.setRetriedRelationships(retriedRelationships); + } + + if (isNotNull(backoffMechanism)) { + processor.setBackoffMechanism(BackoffMechanism.valueOf(backoffMechanism)); + } + + if (isNotNull(maxBackoffPeriod)) { + processor.setMaxBackoffPeriod(maxBackoffPeriod); + } + if (isNotNull(undefinedRelationshipsToTerminate)) { final Set relationships = new HashSet<>(); for (final String relName : undefinedRelationshipsToTerminate) { @@ -427,7 +448,11 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { configDTO.getSchedulingPeriod(), configDTO.getSchedulingStrategy(), configDTO.getExecutionNode(), - configDTO.getYieldDuration())) { + configDTO.getYieldDuration(), + configDTO.getRetryCount(), + configDTO.getBackoffMechanism(), + configDTO.getMaxBackoffPeriod(), + configDTO.getRetriedRelationships())) { modificationRequest = true; } diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/DifferenceType.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/DifferenceType.java index 14aae797ca..712c1a557e 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/DifferenceType.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/DifferenceType.java @@ -53,6 +53,26 @@ public enum DifferenceType { */ YIELD_DURATION_CHANGED("Yield Duration Changed"), + /** + * The component has a different retry count in each of the flows + */ + RETRY_COUNT_CHANGED("Retry Count Changed"), + + /** + * The component has a different retried relationship in each of the flows + */ + RETRIED_RELATIONSHIPS_CHANGED("Retried Relationships Changed"), + + /** + * The component has a different backoff mechanism in each of the flows + */ + BACKOFF_MECHANISM_CHANGED("Backoff Mechanism Changed"), + + /** + * The component has a different max backoff period in each of the flows + */ + MAX_BACKOFF_PERIOD_CHANGED("Max Backoff Period Changed"), + /** * The component has a different bulletin level in each of the flows */ diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java index 4fc579e8ca..e92a317514 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java @@ -169,6 +169,10 @@ public class StandardFlowComparator implements FlowComparator { addIfDifferent(differences, DifferenceType.SCHEDULED_STATE_CHANGED, processorA, processorB, VersionedProcessor::getScheduledState); addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, VersionedProcessor::getStyle); addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getYieldDuration); + addIfDifferent(differences, DifferenceType.RETRY_COUNT_CHANGED, processorA, processorB, VersionedProcessor::getRetryCount); + addIfDifferent(differences, DifferenceType.RETRIED_RELATIONSHIPS_CHANGED, processorA, processorB, VersionedProcessor::getRetriedRelationships); + addIfDifferent(differences, DifferenceType.BACKOFF_MECHANISM_CHANGED, processorA, processorB, VersionedProcessor::getBackoffMechanism); + addIfDifferent(differences, DifferenceType.MAX_BACKOFF_PERIOD_CHANGED, processorA, processorB, VersionedProcessor::getMaxBackoffPeriod); compareProperties(processorA, processorB, processorA.getProperties(), processorB.getProperties(), processorA.getPropertyDescriptors(), processorB.getPropertyDescriptors(), differences); } diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java index defb168918..a5054c6e88 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java @@ -144,6 +144,10 @@ public class VersionedFlowBuilder { processor.setType(type); processor.setYieldDuration("1 sec"); processor.setSchedulingStrategy("TIMER_DRIVEN"); + processor.setRetryCount(0); + processor.setBackoffMechanism("PENALIZE_FLOWFILE"); + processor.setRetriedRelationships(Collections.EMPTY_SET); + processor.setMaxBackoffPeriod("0 sec"); group.getProcessors().add(processor); return processor; diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/StatelessRetryIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/StatelessRetryIT.java new file mode 100644 index 0000000000..0a6b8115e4 --- /dev/null +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/StatelessRetryIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stateless.basics; + +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.StatelessSystemIT; +import org.apache.nifi.stateless.VersionedFlowBuilder; +import org.apache.nifi.stateless.config.StatelessConfigurationException; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StatelessRetryIT extends StatelessSystemIT { + private static final int RETRY_COUNT = 2; + private static final String EXPECTED_COUNTER = String.valueOf(RETRY_COUNT + 1); + + @Test + public void testRetryHappensTwiceThenFinishes() throws StatelessConfigurationException, IOException, InterruptedException { + final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder(); + + //Create a GenerateFlowFile processor + final VersionedProcessor generateFlowFile = flowBuilder.createSimpleProcessor("GenerateFlowFile"); + + //Create a CountFlowFiles processor and configure 2 retries + final VersionedProcessor countFlowFiles = flowBuilder.createSimpleProcessor("CountFlowFiles"); + countFlowFiles.setMaxBackoffPeriod("1 ms"); + countFlowFiles.setRetryCount(RETRY_COUNT); + countFlowFiles.setBackoffMechanism("PENALIZE_FLOWFILE"); + countFlowFiles.setRetriedRelationships(Collections.singleton("success")); + countFlowFiles.setPenaltyDuration("1 ms"); + + //Create a CountService and add it to the CountFlowFiles processor + final VersionedControllerService countService = flowBuilder.createSimpleControllerService("StandardCountService", "CountService"); + flowBuilder.addControllerServiceReference(countFlowFiles, countService, "Count Service"); + + //Create an Output port + final VersionedPort outPort = flowBuilder.createOutputPort("Out"); + + //Connect GenerateFlowFile and CountFlowFiles processors + flowBuilder.createConnection(generateFlowFile, countFlowFiles, "success"); + + //Connect Output port and CountFlowFiles processor + flowBuilder.createConnection(countFlowFiles, outPort, "success"); + + // Startup the dataflow + final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList()); + + // Enqueue data and trigger + final DataflowTrigger trigger = dataflow.trigger(); + final TriggerResult result = trigger.getResult(); + assertTrue(result.isSuccessful()); + + //Assert that CountService has been called 3 times (2 retries and 1 final run) + final List flowFiles = result.getOutputFlowFiles("Out"); + assertEquals(1, flowFiles.size()); + assertEquals(EXPECTED_COUNTER, flowFiles.get(0).getAttribute("count")); + + result.acknowledge(); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitTextByLine.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitTextByLine.java new file mode 100644 index 0000000000..9b31aca4a6 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitTextByLine.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class SplitTextByLine extends AbstractProcessor { + + static final Relationship ORIGINAL = new Relationship.Builder() + .name("original") + .build(); + static final Relationship SPLITS = new Relationship.Builder() + .name("splits") + .build(); + static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .build(); + + @Override + public Set getRelationships() { + return new HashSet<>(Arrays.asList(ORIGINAL, SPLITS, FAILURE)); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final List splits = new ArrayList<>(); + + try (final InputStream in = session.read(flowFile); + final Reader streamReader = new InputStreamReader(in); + final BufferedReader reader = new BufferedReader(streamReader)) { + + String line; + while ((line = reader.readLine()) != null) { + splits.add(line); + } + } catch (IOException ioe) { + session.transfer(flowFile, FAILURE); + } + + long offset = 0; + for (String splitText : splits) { + FlowFile splitFlowFile = session.clone(flowFile, offset, splitText.getBytes(StandardCharsets.UTF_8).length); + offset = offset + splitText.getBytes(StandardCharsets.UTF_8).length; + splitFlowFile = session.write(splitFlowFile, out -> out.write(splitText.getBytes(StandardCharsets.UTF_8))); + session.transfer(splitFlowFile, SPLITS); + } + session.transfer(flowFile, ORIGINAL); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b661c11660..66fc3133bd 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -45,3 +45,4 @@ org.apache.nifi.processors.tests.system.VerifyContents org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile org.apache.nifi.processors.tests.system.WriteLifecycleEvents org.apache.nifi.processors.tests.system.WriteToFile +org.apache.nifi.processors.tests.system.SplitTextByLine diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RetryIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RetryIT.java new file mode 100644 index 0000000000..70787eafb0 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RetryIT.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.processor; + +import org.apache.nifi.controller.BackoffMechanism; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class RetryIT extends NiFiSystemIT { + private static final int RETRY_COUNT = 2; + + @Test + public void testRetryHappensTwiceThenFinishes() throws NiFiClientException, IOException, InterruptedException { + //Create a GenerateFlowFile processor + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + + //Create a PassThrough processor + final ProcessorEntity passThrough = getClientUtil().createProcessor("PassThrough"); + + //Configure the processor's success relationship to be retried twice with flow file penalization for a maximum of 1 ms + enableRetries(passThrough, Collections.singleton("success")); + + //Create a connection between GenerateFlowFile and PassThrough processors + final ConnectionEntity generateFlowFileAndPassThroughConnection = getClientUtil().createConnection(generateFlowFile, passThrough, "success"); + + //Create a CountEvents processor with autoterminated relationship + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + //Create a connection between PassThrough and CountEvents processors + final ConnectionEntity passThroughAndCountEventsConnection = getClientUtil().createConnection(passThrough, countEvents, "success"); + + + // Wait for processors to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(passThrough.getId()); + getClientUtil().waitForValidProcessor(countEvents.getId()); + + //Generate flow file + runProcessorOnce(generateFlowFile); + getClientUtil().waitForStoppedProcessor(generateFlowFile.getId()); + waitForQueueCount(generateFlowFileAndPassThroughConnection.getId(), 1); + + assertEquals(0, getRetryCount(generateFlowFileAndPassThroughConnection)); + + //Run PassThrough processor, this will trigger the first retry + runProcessorOnce(passThrough); + getClientUtil().waitForStoppedProcessor(passThrough.getId()); + waitForQueueCount(generateFlowFileAndPassThroughConnection.getId(), 1); + + assertEquals(1, getRetryCount(generateFlowFileAndPassThroughConnection)); + + //Running PassThrough processor again, will trigger another retry + runProcessorOnce(passThrough); + getClientUtil().waitForStoppedProcessor(passThrough.getId()); + waitForQueueCount(generateFlowFileAndPassThroughConnection.getId(), 1); + + assertEquals(2, getRetryCount(generateFlowFileAndPassThroughConnection)); + + //Running PassThrough processor again should finish retry and transfer flow file + runProcessorOnce(passThrough); + getClientUtil().waitForStoppedProcessor(passThrough.getId()); + waitForQueueCount(passThroughAndCountEventsConnection.getId(), 1); + + assertEquals(0, getRetryCount(passThroughAndCountEventsConnection)); + assertEquals(0, getConnectionQueueSize(generateFlowFileAndPassThroughConnection.getId())); + } + + @Test + public void testNoRetryHappensWithSourceTypeProcessors() throws NiFiClientException, IOException, InterruptedException { + //Create a GenerateFlowFile processor + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + + //Configure the processor's success relationship to be retried twice with flow file penalization for a maximum of 1 ms + enableRetries(generateFlowFile, Collections.singleton("success")); + + //Create a CountEvents processor with autoterminated relationship + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + //Create a connection between GenerateFlowFile and CountEvents processors + final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success"); + + // Wait for processors to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(countEvents.getId()); + + runProcessorOnce(generateFlowFile); + waitForQueueCount(connection.getId(), 1); + + assertEquals(0, getRetryCount(connection)); + } + + private void runProcessorOnce(final ProcessorEntity processorEntity) throws NiFiClientException, IOException, InterruptedException { + getNifiClient().getProcessorClient().runProcessorOnce(processorEntity); + getClientUtil().waitForStoppedProcessor(processorEntity.getId()); + } + + @Test + public void testRetryHappensWhenOnlyChildRelationshipHasRetryConfigured() throws NiFiClientException, IOException, InterruptedException { + //Create a GenerateFlowFile processor + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + Map properties = new HashMap<>(); + properties.put("Text", "abc\n123\nxyz\n321"); + properties.put("Batch Size", "1"); + getClientUtil().updateProcessorProperties(generateFlowFile, properties); + + //Create a SplitTextByLine processor + final ProcessorEntity splitTextByLine = getClientUtil().createProcessor("SplitTextByLine"); + + //Configure the processor's splits relationship to be retried twice with flow file penalization for a maximum of 1 ms + enableRetries(splitTextByLine, Collections.singleton("splits")); + + //Create a connection between GenerateFlowFile and SplitTextByLine processors + final ConnectionEntity generateFlowFileAndSplitTextByLineConnection = getClientUtil().createConnection(generateFlowFile, splitTextByLine, "success"); + + //Create a CountEvents processor with autoterminated relationship + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + //Create a connection between SplitTextByLine and CountEvents processors + final ConnectionEntity splitTextByLineAndCountEventsOriginalConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "original"); + final ConnectionEntity splitTextByLineAndCountEventsSplitsConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "splits"); + final ConnectionEntity splitTextByLineAndCountEventsFailureConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "failure"); + + + // Wait for processors to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(splitTextByLine.getId()); + getClientUtil().waitForValidProcessor(countEvents.getId()); + + //Generate flow file + runProcessorOnce(generateFlowFile); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(0, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Run SplitTextByLine processor, this will trigger the first retry + runProcessorOnce(splitTextByLine); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(1, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Running SplitTextByLine processor again, will trigger another retry + runProcessorOnce(splitTextByLine); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(2, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Running SplitTextByLine processor again should finish retry and transfer flow file + runProcessorOnce(splitTextByLine); + waitForQueueCount(splitTextByLineAndCountEventsSplitsConnection.getId(), 4); + + assertEquals(0, getRetryCount(splitTextByLineAndCountEventsSplitsConnection)); + assertEquals(0, getConnectionQueueSize(generateFlowFileAndSplitTextByLineConnection.getId())); + assertEquals(0, getConnectionQueueSize(splitTextByLineAndCountEventsFailureConnection.getId())); + assertEquals(1, getConnectionQueueSize(splitTextByLineAndCountEventsOriginalConnection.getId())); + } + + @Test + public void testRetryHappensWhenOnlyParentRelationshipHasRetryConfigured() throws NiFiClientException, IOException, InterruptedException { + //Create a GenerateFlowFile processor + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + Map properties = new HashMap<>(); + properties.put("Text", "abc\n123\nxyz"); + properties.put("Batch Size", "1"); + getClientUtil().updateProcessorProperties(generateFlowFile, properties); + + //Create a SplitTextByLine processor + final ProcessorEntity splitTextByLine = getClientUtil().createProcessor("SplitTextByLine"); + + //Configure the processor's original relationship to be retried twice with flow file penalization for a maximum of 1 ms + enableRetries(splitTextByLine, Collections.singleton("original")); + + //Create a connection between GenerateFlowFile and SplitTextByLine processors + final ConnectionEntity generateFlowFileAndSplitTextByLineConnection = getClientUtil().createConnection(generateFlowFile, splitTextByLine, "success"); + + //Create a CountEvents processor with autoterminated relationship + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + //Create a connection between SplitTextByLine and CountEvents processors + final ConnectionEntity splitTextByLineAndCountEventsOriginalConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "original"); + final ConnectionEntity splitTextByLineAndCountEventsSplitsConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "splits"); + final ConnectionEntity splitTextByLineAndCountEventsFailureConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "failure"); + + + // Wait for processors to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(splitTextByLine.getId()); + getClientUtil().waitForValidProcessor(countEvents.getId()); + + //Generate flow file + runProcessorOnce(generateFlowFile); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(0, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Run SplitTextByLine processor, this will trigger the first retry + runProcessorOnce(splitTextByLine); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(1, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Running SplitTextByLine processor again, will trigger another retry + runProcessorOnce(splitTextByLine); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(2, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Running SplitTextByLine processor again should finish retry and transfer flow file + runProcessorOnce(splitTextByLine); + waitForQueueCount(splitTextByLineAndCountEventsSplitsConnection.getId(), 3); + + assertEquals(0, getRetryCount(splitTextByLineAndCountEventsSplitsConnection)); + assertEquals(0, getConnectionQueueSize(generateFlowFileAndSplitTextByLineConnection.getId())); + assertEquals(0, getConnectionQueueSize(splitTextByLineAndCountEventsFailureConnection.getId())); + assertEquals(1, getConnectionQueueSize(splitTextByLineAndCountEventsOriginalConnection.getId())); + } + + @Test + public void testRetryHappensOnlyOnceWhenBothChildAndParentRelationshipHasRetryConfigured() throws NiFiClientException, IOException, InterruptedException { + //Create a GenerateFlowFile processor + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + Map properties = new HashMap<>(); + properties.put("Text", "abc\n123\nxyz\n321\ndfg"); + properties.put("Batch Size", "1"); + getClientUtil().updateProcessorProperties(generateFlowFile, properties); + + //Create a SplitTextByLine processor + final ProcessorEntity splitTextByLine = getClientUtil().createProcessor("SplitTextByLine"); + + //Configure the processor's original and splits relationships to be retried twice with flow file penalization for a maximum of 1 ms + enableRetries(splitTextByLine, new HashSet<>(Arrays.asList("original", "splits"))); + + //Create a connection between GenerateFlowFile and SplitTextByLine processors + final ConnectionEntity generateFlowFileAndSplitTextByLineConnection = getClientUtil().createConnection(generateFlowFile, splitTextByLine, "success"); + + //Create a CountEvents processor with autoterminated relationship + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + //Create a connection between SplitTextByLine and CountEvents processors + final ConnectionEntity splitTextByLineAndCountEventsOriginalConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "original"); + final ConnectionEntity splitTextByLineAndCountEventsSplitsConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "splits"); + final ConnectionEntity splitTextByLineAndCountEventsFailureConnection = getClientUtil().createConnection(splitTextByLine, countEvents, "failure"); + + + // Wait for processors to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(splitTextByLine.getId()); + getClientUtil().waitForValidProcessor(countEvents.getId()); + + //Generate flow file + runProcessorOnce(generateFlowFile); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(0, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Run SplitTextByLine processor, this will trigger the first retry + runProcessorOnce(splitTextByLine); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(1, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Running SplitTextByLine processor again, will trigger another retry + runProcessorOnce(splitTextByLine); + waitForQueueCount(generateFlowFileAndSplitTextByLineConnection.getId(), 1); + + assertEquals(2, getRetryCount(generateFlowFileAndSplitTextByLineConnection)); + + //Running SplitTextByLine processor again should finish retry and transfer flow file + runProcessorOnce(splitTextByLine); + waitForQueueCount(splitTextByLineAndCountEventsSplitsConnection.getId(), 5); + + assertEquals(0, getRetryCount(splitTextByLineAndCountEventsSplitsConnection)); + assertEquals(0, getConnectionQueueSize(generateFlowFileAndSplitTextByLineConnection.getId())); + assertEquals(0, getConnectionQueueSize(splitTextByLineAndCountEventsFailureConnection.getId())); + assertEquals(1, getConnectionQueueSize(splitTextByLineAndCountEventsOriginalConnection.getId())); + } + + private int getRetryCount(final ConnectionEntity connection) throws NiFiClientException, IOException { + return getClientUtil().getQueueFlowFile(connection.getId(), 0).getFlowFile().getAttributes() + .entrySet().stream() + .filter(entry -> entry.getKey().startsWith("retryCount.")) + .map(Map.Entry::getValue) + .findFirst() + .map(Integer::parseInt) + .orElse(0); + } + + private void enableRetries(final ProcessorEntity processorEntity, final Set relationships) throws NiFiClientException, IOException { + final ProcessorConfigDTO config = new ProcessorConfigDTO(); + config.setRetryCount(RETRY_COUNT); + config.setMaxBackoffPeriod("1 ms"); + config.setBackoffMechanism(BackoffMechanism.PENALIZE_FLOWFILE.name()); + config.setRetriedRelationships(relationships); + config.setPenaltyDuration("1 ms"); + getClientUtil().updateProcessorConfig(processorEntity, config); + } +}