1
0
mirror of https://github.com/apache/nifi.git synced 2025-02-06 18:18:27 +00:00

NIFI-11783: Fixed a bug in which a FlowFile split into multiple and then removed did not properly handle retries when one of the outputs was retried.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes 
This commit is contained in:
Mark Payne 2023-07-19 11:04:43 -04:00 committed by Matt Burgess
parent 7405dcb510
commit 323f148d27
4 changed files with 68 additions and 30 deletions
nifi-nar-bundles/nifi-framework-bundle/nifi-framework
nifi-framework-components/src/main/java/org/apache/nifi/controller/repository
nifi-repository-models/src/main/java/org/apache/nifi/controller/repository
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system

@ -345,7 +345,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
dropEvent = provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship");
autoTerminatedEvents.add(dropEvent);
} catch (final Exception e) {
LOG.warn("Unable to generate Provenance Event for {} on behalf of {} due to {}", record.getCurrent(), connectableDescription, e);
LOG.warn("Unable to generate Provenance Event for {} on behalf of {}", record.getCurrent(), connectableDescription, e);
if (LOG.isDebugEnabled()) {
LOG.warn("", e);
}
@ -428,12 +428,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// Account for any statistics that have been added to for FlowFiles/Bytes In/Out
final Relationship relationship = record.getTransferRelationship();
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
final boolean autoTerminated = connectable.isAutoTerminated(relationship);
if (!autoTerminated) {
flowFilesOut-= multiplier;
contentSizeOut -= record.getCurrent().getSize() * multiplier;
if (relationship != null) {
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
final boolean autoTerminated = connectable.isAutoTerminated(relationship);
if (!autoTerminated) {
flowFilesOut -= multiplier;
contentSizeOut -= record.getCurrent().getSize() * multiplier;
}
}
final FlowFileRecord original = record.getOriginal();
@ -1584,10 +1586,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
final StandardRepositoryRecord repoRecord = this.records.remove(flowFileId);
newOwner.records.put(flowFileId, repoRecord);
final Collection<Long> linkedIds = this.flowFileLinkage.remove(flowFileId);
if (linkedIds != null) {
linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
}
final Collection<Long> linkedIds = this.flowFileLinkage.getLinkedIds(flowFileId);
linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
// Adjust the counts for Connections for each FlowFile that was pulled from a Connection.
// We do not have to worry about accounting for 'input counts' on connections because those
@ -2458,8 +2458,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
removedBytes += flowFile.getSize();
provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
}
flowFileLinkage.remove(flowFile.getId());
}
@Override
@ -2482,8 +2480,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
removedBytes += flowFile.getSize();
provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
}
flowFileLinkage.remove(flowFile.getId());
}
}
@ -4144,23 +4140,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
for (final Long linkedId : linked) {
final List<Long> onceRemoved = linkedIds.get(linkedId);
allLinked.addAll(onceRemoved);
if (onceRemoved != null) {
allLinked.addAll(onceRemoved);
}
}
}
return allLinked;
}
public Collection<Long> remove(final long id) {
final List<Long> linked = linkedIds.remove(id);
if (linked != null) {
for (final Long otherId : linked) {
linkedIds.get(otherId).remove(id);
}
}
return linked;
}
public void clear() {
linkedIds.clear();

@ -181,6 +181,12 @@ public class StandardRepositoryRecord implements RepositoryRecord {
public void setTransferRelationship(final Relationship relationship) {
transferRelationship = relationship;
// If we're changing from DELETE to transferring to SELF, this means we're rolling the FlowFile back. In this case,
// we need to set the type to UPDATE.
if (relationship == Relationship.SELF && isMarkedForDelete() && originalFlowFileRecord != null && originalQueue != null) {
setType(RepositoryRecordType.UPDATE);
}
}
public Relationship getTransferRelationship() {

@ -448,7 +448,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
}
}
private void logQueueSizes() throws NiFiClientException, IOException {
protected void logQueueSizes() throws NiFiClientException, IOException {
final ProcessGroupStatusEntity groupStatusEntity = getNifiClient().getFlowClient().getProcessGroupStatus("root", true);
final ProcessGroupStatusSnapshotDTO groupStatusDto = groupStatusEntity.getProcessGroupStatus().getAggregateSnapshot();

@ -20,7 +20,9 @@ import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
@ -37,6 +39,45 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class RetryIT extends NiFiSystemIT {
private static final int RETRY_COUNT = 2;
@Test
public void testSplitInputIntoTwoRemoveParentRetryChild() throws NiFiClientException, IOException, InterruptedException {
// Create a GenerateFlowFile processor and a SplitByLine Processor
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity splitByLine = getClientUtil().createProcessor("SplitByLine");
final ProcessorEntity terminateFlowFile = getClientUtil().createProcessor("TerminateFlowFile");
// Configure split to retry once. Set backoff/penalty to 60 seconds to ensure that it is not re-processed before having a chance to verify the rollback
enableRetries(splitByLine, Collections.singleton("success"), 60_000L);
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Text", "abc\nxyz"));
final ConnectionEntity generateToSplit = getClientUtil().createConnection(generateFlowFile, splitByLine, "success");
final ConnectionEntity splitToTerminate = getClientUtil().createConnection(splitByLine, terminateFlowFile, "success");
getClientUtil().startProcessor(generateFlowFile);
waitForQueueCount(generateToSplit.getId(), 1);
getClientUtil().startProcessor(splitByLine);
waitFor(() -> {
if (getConnectionQueueSize(generateToSplit.getId()) != 1) {
return false;
}
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(generateToSplit.getId(), 0);
if (!Boolean.TRUE.equals(flowFile.getFlowFile().getPenalized())) {
return false;
}
return getConnectionQueueSize(splitToTerminate.getId()) == 0;
});
final ProcessorStatusDTO statusDto = getNifiClient().getProcessorClient().getProcessor(splitByLine.getId()).getStatus();
assertEquals(0, statusDto.getAggregateSnapshot().getFlowFilesIn());
assertEquals(0L, statusDto.getAggregateSnapshot().getBytesIn());
assertEquals(0, statusDto.getAggregateSnapshot().getFlowFilesOut());
assertEquals(0L, statusDto.getAggregateSnapshot().getBytesOut());
}
@Test
public void testRetryHappensTwiceThenFinishes() throws NiFiClientException, IOException, InterruptedException {
//Create a GenerateFlowFile processor
@ -318,12 +359,16 @@ public class RetryIT extends NiFiSystemIT {
}
private void enableRetries(final ProcessorEntity processorEntity, final Set<String> relationships) throws NiFiClientException, IOException {
enableRetries(processorEntity, relationships, 1);
}
private void enableRetries(final ProcessorEntity processorEntity, final Set<String> relationships, final long backoffMillis) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setRetryCount(RETRY_COUNT);
config.setMaxBackoffPeriod("1 ms");
config.setMaxBackoffPeriod(backoffMillis + " ms");
config.setBackoffMechanism(BackoffMechanism.PENALIZE_FLOWFILE.name());
config.setRetriedRelationships(relationships);
config.setPenaltyDuration("1 ms");
config.setPenaltyDuration(backoffMillis + " ms");
getClientUtil().updateProcessorConfig(processorEntity, config);
}
}