From 1fc25db47cdecef404548ac011de2c2564574932 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 9 Jul 2020 12:29:49 -0400 Subject: [PATCH] NIFI-7469: Updated RepositoryRecord to include flag indicating whether or not the content of a FlowFile was modified. This allows us to explicitly keep track of this state rather than implying it (potentially incorrectly). This closes #4399. --- .../repository/RepositoryRecord.java | 5 ++ .../nifi/controller/FlowController.java | 2 +- .../queue/DropFlowFileRepositoryRecord.java | 5 ++ .../repository/StandardProcessSession.java | 61 +++++++++++-------- .../TransientClaimRepositoryRecord.java | 5 ++ .../WriteAheadFlowFileRepository.java | 3 +- ...edSequentialAccessWriteAheadLogTest.groovy | 2 +- .../SchemaRepositoryRecordSerdeTest.java | 2 +- .../TestRocksDBFlowFileRepository.java | 12 ++-- .../TestWriteAheadFlowFileRepository.java | 12 ++-- .../repository/StandardRepositoryRecord.java | 22 +++---- .../TestStandardRepositoryRecord.java | 11 +++- 12 files changed, 86 insertions(+), 56 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java index 3b27d959aa..82bb3eb1fd 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java @@ -87,4 +87,9 @@ public interface RepositoryRecord { * life of the Process Session in which they were created and should not be persisted. */ List getTransientClaims(); + + /** + * @return true if the content of the FlowFile has been modified + */ + boolean isContentModified(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 9a686581c0..97b4c13507 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2784,7 +2784,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow final StandardRepositoryRecord record = new StandardRepositoryRecord(queue); - record.setWorking(flowFileRecord); + record.setWorking(flowFileRecord, false); record.setDestination(queue); flowFileRepository.updateRepository(Collections.singleton(record)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java index f47b4eb88b..64ed827251 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java @@ -88,4 +88,9 @@ public class DropFlowFileRepositoryRecord implements RepositoryRecord { public List getTransientClaims() { return Collections.emptyList(); } + + @Override + public boolean isContentModified() { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 089ac90daa..ba49b27dfb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -321,7 +321,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (claim != null) { context.getContentRepository().incrementClaimaintCount(claim); } - newRecord.setWorking(clone, Collections. emptyMap()); + newRecord.setWorking(clone, Collections. emptyMap(), false); newRecord.setDestination(destination.getFlowFileQueue()); newRecord.setTransferRelationship(record.getTransferRelationship()); @@ -1691,7 +1691,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .addAttributes(attrs) .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(fFile, attrs); + record.setWorking(fFile, attrs, false); records.put(fFile.getId(), record); createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); return fFile; @@ -1730,7 +1730,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord fFile = fFileBuilder.build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(fFile, newAttributes); + record.setWorking(fFile, newAttributes, false); records.put(fFile.getId(), record); createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); @@ -1779,7 +1779,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(fFile, newAttributes); + record.setWorking(fFile, newAttributes, false); records.put(fFile.getId(), record); createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); @@ -1820,7 +1820,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE context.getContentRepository().incrementClaimaintCount(claim); } final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(clone, clone.getAttributes()); + record.setWorking(clone, clone.getAttributes(), false); records.put(clone.getId(), record); if (offset == 0L && size == example.getSize()) { @@ -1870,7 +1870,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardRepositoryRecord record = getRecord(flowFile); final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build(); - record.setWorking(newFile); + record.setWorking(newFile, false); return newFile; } @@ -1885,7 +1885,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardRepositoryRecord record = getRecord(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build(); - record.setWorking(newFile, key, value); + record.setWorking(newFile, key, value, false); return newFile; } @@ -1908,7 +1908,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes); final FlowFileRecord newFile = ffBuilder.build(); - record.setWorking(newFile, updatedAttributes); + record.setWorking(newFile, updatedAttributes, false); return newFile; } @@ -1924,7 +1924,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardRepositoryRecord record = getRecord(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build(); - record.setWorking(newFile, key, null); + record.setWorking(newFile, key, null, false); return newFile; } @@ -1949,7 +1949,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE updatedAttrs.put(key, null); } - record.setWorking(newFile, updatedAttrs); + record.setWorking(newFile, updatedAttrs, false); return newFile; } @@ -1962,7 +1962,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build(); if (keyPattern == null) { - record.setWorking(newFile); + record.setWorking(newFile, false); } else { final Map curAttrs = record.getCurrent().getAttributes(); @@ -1977,7 +1977,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - record.setWorking(newFile, removed); + record.setWorking(newFile, removed, false); } return newFile; @@ -1986,7 +1986,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) { final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) .lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build(); - record.setWorking(newFile); + record.setWorking(newFile, false); } private void updateLastQueuedDate(final StandardRepositoryRecord record) { @@ -2582,8 +2582,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } removeTemporaryClaim(destinationRecord); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build(); - destinationRecord.setWorking(newFile); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(destinationRecord.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(0L) + .size(writtenCount) + .build(); + destinationRecord.setWorking(newFile, true); return newFile; } @@ -2697,7 +2702,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .size(bytesWritten) .build(); - record.setWorking(newFile); + record.setWorking(newFile, true); } }; @@ -2777,7 +2782,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .size(writtenToFlowFile) .build(); - record.setWorking(newFile); + record.setWorking(newFile, true); return newFile; } @@ -2888,8 +2893,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeTemporaryClaim(record); } - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0).size(newSize).build(); - record.setWorking(newFile); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(0) + .size(newSize) + .build(); + record.setWorking(newFile, true); return newFile; } @@ -2905,10 +2915,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * @param record record */ private void removeTemporaryClaim(final StandardRepositoryRecord record) { - final boolean contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim(); - - // If the working claim is not the same as the original claim, we have modified the content of - // the FlowFile, and we need to remove the newly created content (the working claim). However, if + // If the content of the FlowFile has already been modified, we need to remove the newly created content (the working claim). However, if // they are the same, we cannot just remove the claim because record.getWorkingClaim() will return // the original claim if the record is "working" but the content has not been modified // (e.g., in the case of attributes only were updated) @@ -2919,7 +2926,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do // because we will do that later, in the session.commit() and that would result in decrementing the count for // the original claim twice. - if (contentModified) { + if (record.isContentModified()) { // In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim). // Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that @@ -3041,7 +3048,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .size(writtenToFlowFile) .build(); - record.setWorking(newFile); + record.setWorking(newFile, true); return newFile; } @@ -3089,7 +3096,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .size(newSize) .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) .build(); - record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); + record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName(), true); if (!keepSourceFile) { deleteOnCommit.put(newFile, source); @@ -3133,7 +3140,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .contentClaimOffset(claimOffset) .size(newSize) .build(); - record.setWorking(newFile); + record.setWorking(newFile, true); return newFile; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java index 8cf6952527..937874ee60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java @@ -86,4 +86,9 @@ public class TransientClaimRepositoryRecord implements RepositoryRecord { public List getTransientClaims() { return claimsToCleanUp; } + + @Override + public boolean isContentModified() { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 4b095ed0d7..be1ada7c9f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -565,13 +565,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private void updateClaimCounts(final RepositoryRecord record) { final ContentClaim currentClaim = record.getCurrentClaim(); final ContentClaim originalClaim = record.getOriginalClaim(); - final boolean claimChanged = !Objects.equals(currentClaim, originalClaim); if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) { decrementClaimCount(currentClaim); } - if (claimChanged) { + if (record.isContentModified()) { // records which have been updated - remove original if exists decrementClaimCount(originalClaim); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy index 07d88bab61..8bc91dbd03 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy @@ -133,7 +133,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { StandardRepositoryRecord record = new StandardRepositoryRecord(queue) StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime()) ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map) - record.setWorking(ffrb.build()) + record.setWorking(ffrb.build(), false) return new LiveSerializedRepositoryRecord(record); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java index 96d595a6ca..c306db1fb1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java @@ -253,7 +253,7 @@ public class SchemaRepositoryRecordSerdeTest { StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue); StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder(); flowFileRecordBuilder.addAttributes(attributes); - standardRepositoryRecord.setWorking(flowFileRecordBuilder.build()); + standardRepositoryRecord.setWorking(flowFileRecordBuilder.build(), false); return standardRepositoryRecord; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java index cebf2cdcfb..4f41de12c6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java @@ -231,7 +231,7 @@ public class TestRocksDBFlowFileRepository { .contentClaim(claim1) .build(); final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue); - rec1.setWorking(flowFile1); + rec1.setWorking(flowFile1, false); rec1.setDestination(queue); // Create a Record that we can swap out @@ -242,7 +242,7 @@ public class TestRocksDBFlowFileRepository { .build(); final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue); - rec2.setWorking(flowFile2); + rec2.setWorking(flowFile2, false); rec2.setDestination(queue); final List records = new ArrayList<>(); @@ -311,7 +311,7 @@ public class TestRocksDBFlowFileRepository { final List records = new ArrayList<>(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(flowFileRecord); + record.setWorking(flowFileRecord, false); record.setDestination(connection.getFlowFileQueue()); records.add(record); @@ -320,13 +320,13 @@ public class TestRocksDBFlowFileRepository { // update to add new attribute ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world"); final FlowFileRecord flowFileRecord2 = ffBuilder.build(); - record.setWorking(flowFileRecord2); + record.setWorking(flowFileRecord2, false); repo.updateRepository(records); // update size but no attribute ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L); final FlowFileRecord flowFileRecord3 = ffBuilder.build(); - record.setWorking(flowFileRecord3); + record.setWorking(flowFileRecord3, false); repo.updateRepository(records); repo.close(); @@ -679,7 +679,7 @@ public class TestRocksDBFlowFileRepository { private List getRepositoryRecord(final FlowFileRecord flowFileRecord) { final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(flowFileRecord); + record.setWorking(flowFileRecord, false); record.setDestination(connection.getFlowFileQueue()); return Collections.singletonList(record); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index a6edbf5f82..42d84c11f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -352,7 +352,7 @@ public class TestWriteAheadFlowFileRepository { final StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFile); record.setDestination(queue); final Map updatedAttrs = Collections.singletonMap("uuid", uuid); - record.setWorking(flowFile, updatedAttrs); + record.setWorking(flowFile, updatedAttrs, false); records.add(new LiveSerializedRepositoryRecord(record)); } @@ -535,7 +535,7 @@ public class TestWriteAheadFlowFileRepository { .contentClaim(claim1) .build(); final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue); - rec1.setWorking(flowFile1); + rec1.setWorking(flowFile1, false); rec1.setDestination(queue); // Create a Record that we can swap out @@ -546,7 +546,7 @@ public class TestWriteAheadFlowFileRepository { .build(); final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue); - rec2.setWorking(flowFile2); + rec2.setWorking(flowFile2, true); rec2.setDestination(queue); final List records = new ArrayList<>(); @@ -623,7 +623,7 @@ public class TestWriteAheadFlowFileRepository { final List records = new ArrayList<>(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(flowFileRecord); + record.setWorking(flowFileRecord, false); record.setDestination(connection.getFlowFileQueue()); records.add(record); @@ -632,13 +632,13 @@ public class TestWriteAheadFlowFileRepository { // update to add new attribute ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world"); final FlowFileRecord flowFileRecord2 = ffBuilder.build(); - record.setWorking(flowFileRecord2); + record.setWorking(flowFileRecord2, false); repo.updateRepository(records); // update size but no attribute ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L); final FlowFileRecord flowFileRecord3 = ffBuilder.build(); - record.setWorking(flowFileRecord3); + record.setWorking(flowFileRecord3, true); repo.updateRepository(records); repo.close(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java index 4aeb473081..a3c724b818 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -39,6 +39,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { private Map updatedAttributes = null; private List transientClaims; private final long startNanos = System.nanoTime(); + private boolean contentModified; /** @@ -110,8 +111,9 @@ public class StandardRepositoryRecord implements RepositoryRecord { return originalQueue; } - public void setWorking(final FlowFileRecord flowFile) { + public void setWorking(final FlowFileRecord flowFile, final boolean contentModified) { workingFlowFileRecord = flowFile; + this.contentModified |= contentModified; } private Map initializeUpdatedAttributes() { @@ -122,8 +124,9 @@ public class StandardRepositoryRecord implements RepositoryRecord { return updatedAttributes; } - public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) { + public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue, final boolean contentModified) { workingFlowFileRecord = flowFile; + this.contentModified |= contentModified; // In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them. if (type == RepositoryRecordType.CREATE) { @@ -137,8 +140,9 @@ public class StandardRepositoryRecord implements RepositoryRecord { } } - public void setWorking(final FlowFileRecord flowFile, final Map updatedAttribs) { + public void setWorking(final FlowFileRecord flowFile, final Map updatedAttribs, final boolean contentModified) { workingFlowFileRecord = flowFile; + this.contentModified |= contentModified; // In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them. if (type == RepositoryRecordType.CREATE) { @@ -183,10 +187,6 @@ public class StandardRepositoryRecord implements RepositoryRecord { return transferRelationship; } - FlowFileRecord getWorking() { - return workingFlowFileRecord; - } - ContentClaim getWorkingClaim() { return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim(); } @@ -206,10 +206,6 @@ public class StandardRepositoryRecord implements RepositoryRecord { return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset(); } - boolean isWorking() { - return (workingFlowFileRecord != null); - } - Map getOriginalAttributes() { return originalAttributes; } @@ -260,4 +256,8 @@ public class StandardRepositoryRecord implements RepositoryRecord { public long getStartNanos() { return startNanos; } + + public boolean isContentModified() { + return contentModified; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java index 61e23fe36b..7c0807ca36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.UUID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestStandardRepositoryRecord { @@ -40,12 +42,19 @@ public class TestStandardRepositoryRecord { .addAttributes(updatedAttributes) .build(); - record.setWorking(flowFileRecord, updatedAttributes); + record.setWorking(flowFileRecord, updatedAttributes, false); final Map updatedWithId = new HashMap<>(updatedAttributes); updatedWithId.put("uuid", uuid); assertEquals(updatedWithId, record.getUpdatedAttributes()); + assertFalse(record.isContentModified()); + + record.setWorking(flowFileRecord, true); + assertTrue(record.isContentModified()); + + record.setWorking(flowFileRecord, false); + assertTrue(record.isContentModified()); // Should still be true because it was modified previously record.markForDelete();