mirror of https://github.com/apache/nifi.git
NIFI-10203: Ensure that when a FlowFile is transferred and is not retried that we remove the retryCount.<uuid> attribute
This closes #6201 Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
parent
2a9139c57a
commit
4c0555a476
|
@ -348,7 +348,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
final FlowFileRecord currRec = record.getCurrent();
|
FlowFileRecord currRec = record.getCurrent();
|
||||||
|
|
||||||
|
// If there's a retry attribute present, remove it. The attribute should only live while the FlowFile is being processed by the current component
|
||||||
|
if (currRec.getAttribute(retryAttribute) != null) {
|
||||||
|
currRec = new StandardFlowFileRecord.Builder().fromFlowFile(currRec).removeAttributes(retryAttribute).build();
|
||||||
|
record.setWorking(currRec, retryAttribute, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
|
final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
|
||||||
record.setDestination(finalDestination.getFlowFileQueue());
|
record.setDestination(finalDestination.getFlowFileQueue());
|
||||||
incrementConnectionInputCounts(finalDestination, record);
|
incrementConnectionInputCounts(finalDestination, record);
|
||||||
|
@ -357,7 +364,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
incrementConnectionInputCounts(destination, record);
|
incrementConnectionInputCounts(destination, record);
|
||||||
|
|
||||||
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
|
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
|
||||||
builder.removeAttributes(retryAttribute);
|
|
||||||
builder.id(context.getNextFlowFileSequence());
|
builder.id(context.getNextFlowFileSequence());
|
||||||
|
|
||||||
final String newUuid = UUID.randomUUID().toString();
|
final String newUuid = UUID.randomUUID().toString();
|
||||||
|
|
|
@ -2968,7 +2968,8 @@ public class StandardProcessSessionIT {
|
||||||
StandardProcessSession session = createSessionForRetry(processor);
|
StandardProcessSession session = createSessionForRetry(processor);
|
||||||
|
|
||||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||||
.build();
|
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||||
|
.build();
|
||||||
|
|
||||||
flowFileQueue.put(flowFileRecord);
|
flowFileQueue.put(flowFileRecord);
|
||||||
|
|
||||||
|
@ -2991,7 +2992,8 @@ public class StandardProcessSessionIT {
|
||||||
final StandardProcessSession session = createSessionForRetry(processor);
|
final StandardProcessSession session = createSessionForRetry(processor);
|
||||||
|
|
||||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||||
.build();
|
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||||
|
.build();
|
||||||
|
|
||||||
flowFileQueue.put(flowFileRecord);
|
flowFileQueue.put(flowFileRecord);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue