mirror of https://github.com/apache/nifi.git
NIFI-7469: Updated RepositoryRecord to include flag indicating whether or not the content of a FlowFile was modified. This allows us to explicitly keep track of this state rather than implying it (potentially incorrectly).
This closes #4399.
This commit is contained in:
parent
ee91341ec3
commit
1fc25db47c
|
@ -87,4 +87,9 @@ public interface RepositoryRecord {
|
||||||
* life of the Process Session in which they were created and should not be persisted.
|
* life of the Process Session in which they were created and should not be persisted.
|
||||||
*/
|
*/
|
||||||
List<ContentClaim> getTransientClaims();
|
List<ContentClaim> getTransientClaims();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the content of the FlowFile has been modified
|
||||||
|
*/
|
||||||
|
boolean isContentModified();
|
||||||
}
|
}
|
||||||
|
|
|
@ -2784,7 +2784,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
||||||
|
|
||||||
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
|
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue);
|
||||||
record.setWorking(flowFileRecord);
|
record.setWorking(flowFileRecord, false);
|
||||||
record.setDestination(queue);
|
record.setDestination(queue);
|
||||||
flowFileRepository.updateRepository(Collections.singleton(record));
|
flowFileRepository.updateRepository(Collections.singleton(record));
|
||||||
|
|
||||||
|
|
|
@ -88,4 +88,9 @@ public class DropFlowFileRepositoryRecord implements RepositoryRecord {
|
||||||
public List<ContentClaim> getTransientClaims() {
|
public List<ContentClaim> getTransientClaims() {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isContentModified() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -321,7 +321,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
if (claim != null) {
|
if (claim != null) {
|
||||||
context.getContentRepository().incrementClaimaintCount(claim);
|
context.getContentRepository().incrementClaimaintCount(claim);
|
||||||
}
|
}
|
||||||
newRecord.setWorking(clone, Collections.<String, String> emptyMap());
|
newRecord.setWorking(clone, Collections.<String, String> emptyMap(), false);
|
||||||
|
|
||||||
newRecord.setDestination(destination.getFlowFileQueue());
|
newRecord.setDestination(destination.getFlowFileQueue());
|
||||||
newRecord.setTransferRelationship(record.getTransferRelationship());
|
newRecord.setTransferRelationship(record.getTransferRelationship());
|
||||||
|
@ -1691,7 +1691,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.addAttributes(attrs)
|
.addAttributes(attrs)
|
||||||
.build();
|
.build();
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(fFile, attrs);
|
record.setWorking(fFile, attrs, false);
|
||||||
records.put(fFile.getId(), record);
|
records.put(fFile.getId(), record);
|
||||||
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
|
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
|
||||||
return fFile;
|
return fFile;
|
||||||
|
@ -1730,7 +1730,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
|
|
||||||
final FlowFileRecord fFile = fFileBuilder.build();
|
final FlowFileRecord fFile = fFileBuilder.build();
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(fFile, newAttributes);
|
record.setWorking(fFile, newAttributes, false);
|
||||||
records.put(fFile.getId(), record);
|
records.put(fFile.getId(), record);
|
||||||
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
|
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
|
||||||
|
|
||||||
|
@ -1779,7 +1779,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(fFile, newAttributes);
|
record.setWorking(fFile, newAttributes, false);
|
||||||
records.put(fFile.getId(), record);
|
records.put(fFile.getId(), record);
|
||||||
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
|
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
|
||||||
|
|
||||||
|
@ -1820,7 +1820,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
context.getContentRepository().incrementClaimaintCount(claim);
|
context.getContentRepository().incrementClaimaintCount(claim);
|
||||||
}
|
}
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(clone, clone.getAttributes());
|
record.setWorking(clone, clone.getAttributes(), false);
|
||||||
records.put(clone.getId(), record);
|
records.put(clone.getId(), record);
|
||||||
|
|
||||||
if (offset == 0L && size == example.getSize()) {
|
if (offset == 0L && size == example.getSize()) {
|
||||||
|
@ -1870,7 +1870,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
final StandardRepositoryRecord record = getRecord(flowFile);
|
final StandardRepositoryRecord record = getRecord(flowFile);
|
||||||
final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
|
final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, false);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1885,7 +1885,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
|
|
||||||
final StandardRepositoryRecord record = getRecord(flowFile);
|
final StandardRepositoryRecord record = getRecord(flowFile);
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build();
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build();
|
||||||
record.setWorking(newFile, key, value);
|
record.setWorking(newFile, key, value, false);
|
||||||
|
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
@ -1908,7 +1908,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes);
|
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes);
|
||||||
final FlowFileRecord newFile = ffBuilder.build();
|
final FlowFileRecord newFile = ffBuilder.build();
|
||||||
|
|
||||||
record.setWorking(newFile, updatedAttributes);
|
record.setWorking(newFile, updatedAttributes, false);
|
||||||
|
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
@ -1924,7 +1924,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
|
|
||||||
final StandardRepositoryRecord record = getRecord(flowFile);
|
final StandardRepositoryRecord record = getRecord(flowFile);
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build();
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build();
|
||||||
record.setWorking(newFile, key, null);
|
record.setWorking(newFile, key, null, false);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1949,7 +1949,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
updatedAttrs.put(key, null);
|
updatedAttrs.put(key, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
record.setWorking(newFile, updatedAttrs);
|
record.setWorking(newFile, updatedAttrs, false);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1962,7 +1962,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
|
||||||
|
|
||||||
if (keyPattern == null) {
|
if (keyPattern == null) {
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, false);
|
||||||
} else {
|
} else {
|
||||||
final Map<String, String> curAttrs = record.getCurrent().getAttributes();
|
final Map<String, String> curAttrs = record.getCurrent().getAttributes();
|
||||||
|
|
||||||
|
@ -1977,7 +1977,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
record.setWorking(newFile, removed);
|
record.setWorking(newFile, removed, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return newFile;
|
return newFile;
|
||||||
|
@ -1986,7 +1986,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) {
|
private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) {
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
|
||||||
.lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build();
|
.lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build();
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateLastQueuedDate(final StandardRepositoryRecord record) {
|
private void updateLastQueuedDate(final StandardRepositoryRecord record) {
|
||||||
|
@ -2582,8 +2582,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
}
|
}
|
||||||
|
|
||||||
removeTemporaryClaim(destinationRecord);
|
removeTemporaryClaim(destinationRecord);
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build();
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
|
||||||
destinationRecord.setWorking(newFile);
|
.fromFlowFile(destinationRecord.getCurrent())
|
||||||
|
.contentClaim(newClaim)
|
||||||
|
.contentClaimOffset(0L)
|
||||||
|
.size(writtenCount)
|
||||||
|
.build();
|
||||||
|
destinationRecord.setWorking(newFile, true);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2697,7 +2702,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.size(bytesWritten)
|
.size(bytesWritten)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, true);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -2777,7 +2782,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.size(writtenToFlowFile)
|
.size(writtenToFlowFile)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, true);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2888,8 +2893,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
removeTemporaryClaim(record);
|
removeTemporaryClaim(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0).size(newSize).build();
|
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
|
||||||
record.setWorking(newFile);
|
.fromFlowFile(record.getCurrent())
|
||||||
|
.contentClaim(newClaim)
|
||||||
|
.contentClaimOffset(0)
|
||||||
|
.size(newSize)
|
||||||
|
.build();
|
||||||
|
record.setWorking(newFile, true);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2905,10 +2915,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
* @param record record
|
* @param record record
|
||||||
*/
|
*/
|
||||||
private void removeTemporaryClaim(final StandardRepositoryRecord record) {
|
private void removeTemporaryClaim(final StandardRepositoryRecord record) {
|
||||||
final boolean contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim();
|
// If the content of the FlowFile has already been modified, we need to remove the newly created content (the working claim). However, if
|
||||||
|
|
||||||
// If the working claim is not the same as the original claim, we have modified the content of
|
|
||||||
// the FlowFile, and we need to remove the newly created content (the working claim). However, if
|
|
||||||
// they are the same, we cannot just remove the claim because record.getWorkingClaim() will return
|
// they are the same, we cannot just remove the claim because record.getWorkingClaim() will return
|
||||||
// the original claim if the record is "working" but the content has not been modified
|
// the original claim if the record is "working" but the content has not been modified
|
||||||
// (e.g., in the case of attributes only were updated)
|
// (e.g., in the case of attributes only were updated)
|
||||||
|
@ -2919,7 +2926,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
// that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
|
// that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
|
||||||
// because we will do that later, in the session.commit() and that would result in decrementing the count for
|
// because we will do that later, in the session.commit() and that would result in decrementing the count for
|
||||||
// the original claim twice.
|
// the original claim twice.
|
||||||
if (contentModified) {
|
if (record.isContentModified()) {
|
||||||
// In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be
|
// In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be
|
||||||
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
|
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
|
||||||
// Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that
|
// Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that
|
||||||
|
@ -3041,7 +3048,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.size(writtenToFlowFile)
|
.size(writtenToFlowFile)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, true);
|
||||||
|
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
@ -3089,7 +3096,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.size(newSize)
|
.size(newSize)
|
||||||
.addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
|
.addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
|
||||||
.build();
|
.build();
|
||||||
record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
|
record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName(), true);
|
||||||
|
|
||||||
if (!keepSourceFile) {
|
if (!keepSourceFile) {
|
||||||
deleteOnCommit.put(newFile, source);
|
deleteOnCommit.put(newFile, source);
|
||||||
|
@ -3133,7 +3140,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
.contentClaimOffset(claimOffset)
|
.contentClaimOffset(claimOffset)
|
||||||
.size(newSize)
|
.size(newSize)
|
||||||
.build();
|
.build();
|
||||||
record.setWorking(newFile);
|
record.setWorking(newFile, true);
|
||||||
return newFile;
|
return newFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,4 +86,9 @@ public class TransientClaimRepositoryRecord implements RepositoryRecord {
|
||||||
public List<ContentClaim> getTransientClaims() {
|
public List<ContentClaim> getTransientClaims() {
|
||||||
return claimsToCleanUp;
|
return claimsToCleanUp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isContentModified() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -565,13 +565,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
||||||
private void updateClaimCounts(final RepositoryRecord record) {
|
private void updateClaimCounts(final RepositoryRecord record) {
|
||||||
final ContentClaim currentClaim = record.getCurrentClaim();
|
final ContentClaim currentClaim = record.getCurrentClaim();
|
||||||
final ContentClaim originalClaim = record.getOriginalClaim();
|
final ContentClaim originalClaim = record.getOriginalClaim();
|
||||||
final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
|
|
||||||
|
|
||||||
if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
|
if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
|
||||||
decrementClaimCount(currentClaim);
|
decrementClaimCount(currentClaim);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (claimChanged) {
|
if (record.isContentModified()) {
|
||||||
// records which have been updated - remove original if exists
|
// records which have been updated - remove original if exists
|
||||||
decrementClaimCount(originalClaim);
|
decrementClaimCount(originalClaim);
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,7 +133,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
|
||||||
StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
|
StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
|
||||||
StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime())
|
StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime())
|
||||||
ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, String>)
|
ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, String>)
|
||||||
record.setWorking(ffrb.build())
|
record.setWorking(ffrb.build(), false)
|
||||||
|
|
||||||
return new LiveSerializedRepositoryRecord(record);
|
return new LiveSerializedRepositoryRecord(record);
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,7 +253,7 @@ public class SchemaRepositoryRecordSerdeTest {
|
||||||
StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue);
|
StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue);
|
||||||
StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder();
|
StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder();
|
||||||
flowFileRecordBuilder.addAttributes(attributes);
|
flowFileRecordBuilder.addAttributes(attributes);
|
||||||
standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
|
standardRepositoryRecord.setWorking(flowFileRecordBuilder.build(), false);
|
||||||
return standardRepositoryRecord;
|
return standardRepositoryRecord;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -231,7 +231,7 @@ public class TestRocksDBFlowFileRepository {
|
||||||
.contentClaim(claim1)
|
.contentClaim(claim1)
|
||||||
.build();
|
.build();
|
||||||
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
|
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
|
||||||
rec1.setWorking(flowFile1);
|
rec1.setWorking(flowFile1, false);
|
||||||
rec1.setDestination(queue);
|
rec1.setDestination(queue);
|
||||||
|
|
||||||
// Create a Record that we can swap out
|
// Create a Record that we can swap out
|
||||||
|
@ -242,7 +242,7 @@ public class TestRocksDBFlowFileRepository {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
|
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
|
||||||
rec2.setWorking(flowFile2);
|
rec2.setWorking(flowFile2, false);
|
||||||
rec2.setDestination(queue);
|
rec2.setDestination(queue);
|
||||||
|
|
||||||
final List<RepositoryRecord> records = new ArrayList<>();
|
final List<RepositoryRecord> records = new ArrayList<>();
|
||||||
|
@ -311,7 +311,7 @@ public class TestRocksDBFlowFileRepository {
|
||||||
|
|
||||||
final List<RepositoryRecord> records = new ArrayList<>();
|
final List<RepositoryRecord> records = new ArrayList<>();
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(flowFileRecord);
|
record.setWorking(flowFileRecord, false);
|
||||||
record.setDestination(connection.getFlowFileQueue());
|
record.setDestination(connection.getFlowFileQueue());
|
||||||
records.add(record);
|
records.add(record);
|
||||||
|
|
||||||
|
@ -320,13 +320,13 @@ public class TestRocksDBFlowFileRepository {
|
||||||
// update to add new attribute
|
// update to add new attribute
|
||||||
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world");
|
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world");
|
||||||
final FlowFileRecord flowFileRecord2 = ffBuilder.build();
|
final FlowFileRecord flowFileRecord2 = ffBuilder.build();
|
||||||
record.setWorking(flowFileRecord2);
|
record.setWorking(flowFileRecord2, false);
|
||||||
repo.updateRepository(records);
|
repo.updateRepository(records);
|
||||||
|
|
||||||
// update size but no attribute
|
// update size but no attribute
|
||||||
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
|
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
|
||||||
final FlowFileRecord flowFileRecord3 = ffBuilder.build();
|
final FlowFileRecord flowFileRecord3 = ffBuilder.build();
|
||||||
record.setWorking(flowFileRecord3);
|
record.setWorking(flowFileRecord3, false);
|
||||||
repo.updateRepository(records);
|
repo.updateRepository(records);
|
||||||
|
|
||||||
repo.close();
|
repo.close();
|
||||||
|
@ -679,7 +679,7 @@ public class TestRocksDBFlowFileRepository {
|
||||||
|
|
||||||
private List<RepositoryRecord> getRepositoryRecord(final FlowFileRecord flowFileRecord) {
|
private List<RepositoryRecord> getRepositoryRecord(final FlowFileRecord flowFileRecord) {
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(flowFileRecord);
|
record.setWorking(flowFileRecord, false);
|
||||||
record.setDestination(connection.getFlowFileQueue());
|
record.setDestination(connection.getFlowFileQueue());
|
||||||
return Collections.singletonList(record);
|
return Collections.singletonList(record);
|
||||||
}
|
}
|
||||||
|
|
|
@ -352,7 +352,7 @@ public class TestWriteAheadFlowFileRepository {
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFile);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFile);
|
||||||
record.setDestination(queue);
|
record.setDestination(queue);
|
||||||
final Map<String, String> updatedAttrs = Collections.singletonMap("uuid", uuid);
|
final Map<String, String> updatedAttrs = Collections.singletonMap("uuid", uuid);
|
||||||
record.setWorking(flowFile, updatedAttrs);
|
record.setWorking(flowFile, updatedAttrs, false);
|
||||||
|
|
||||||
records.add(new LiveSerializedRepositoryRecord(record));
|
records.add(new LiveSerializedRepositoryRecord(record));
|
||||||
}
|
}
|
||||||
|
@ -535,7 +535,7 @@ public class TestWriteAheadFlowFileRepository {
|
||||||
.contentClaim(claim1)
|
.contentClaim(claim1)
|
||||||
.build();
|
.build();
|
||||||
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
|
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
|
||||||
rec1.setWorking(flowFile1);
|
rec1.setWorking(flowFile1, false);
|
||||||
rec1.setDestination(queue);
|
rec1.setDestination(queue);
|
||||||
|
|
||||||
// Create a Record that we can swap out
|
// Create a Record that we can swap out
|
||||||
|
@ -546,7 +546,7 @@ public class TestWriteAheadFlowFileRepository {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
|
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
|
||||||
rec2.setWorking(flowFile2);
|
rec2.setWorking(flowFile2, true);
|
||||||
rec2.setDestination(queue);
|
rec2.setDestination(queue);
|
||||||
|
|
||||||
final List<RepositoryRecord> records = new ArrayList<>();
|
final List<RepositoryRecord> records = new ArrayList<>();
|
||||||
|
@ -623,7 +623,7 @@ public class TestWriteAheadFlowFileRepository {
|
||||||
|
|
||||||
final List<RepositoryRecord> records = new ArrayList<>();
|
final List<RepositoryRecord> records = new ArrayList<>();
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(flowFileRecord);
|
record.setWorking(flowFileRecord, false);
|
||||||
record.setDestination(connection.getFlowFileQueue());
|
record.setDestination(connection.getFlowFileQueue());
|
||||||
records.add(record);
|
records.add(record);
|
||||||
|
|
||||||
|
@ -632,13 +632,13 @@ public class TestWriteAheadFlowFileRepository {
|
||||||
// update to add new attribute
|
// update to add new attribute
|
||||||
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world");
|
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world");
|
||||||
final FlowFileRecord flowFileRecord2 = ffBuilder.build();
|
final FlowFileRecord flowFileRecord2 = ffBuilder.build();
|
||||||
record.setWorking(flowFileRecord2);
|
record.setWorking(flowFileRecord2, false);
|
||||||
repo.updateRepository(records);
|
repo.updateRepository(records);
|
||||||
|
|
||||||
// update size but no attribute
|
// update size but no attribute
|
||||||
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
|
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
|
||||||
final FlowFileRecord flowFileRecord3 = ffBuilder.build();
|
final FlowFileRecord flowFileRecord3 = ffBuilder.build();
|
||||||
record.setWorking(flowFileRecord3);
|
record.setWorking(flowFileRecord3, true);
|
||||||
repo.updateRepository(records);
|
repo.updateRepository(records);
|
||||||
|
|
||||||
repo.close();
|
repo.close();
|
||||||
|
|
|
@ -39,6 +39,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
private Map<String, String> updatedAttributes = null;
|
private Map<String, String> updatedAttributes = null;
|
||||||
private List<ContentClaim> transientClaims;
|
private List<ContentClaim> transientClaims;
|
||||||
private final long startNanos = System.nanoTime();
|
private final long startNanos = System.nanoTime();
|
||||||
|
private boolean contentModified;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -110,8 +111,9 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
return originalQueue;
|
return originalQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setWorking(final FlowFileRecord flowFile) {
|
public void setWorking(final FlowFileRecord flowFile, final boolean contentModified) {
|
||||||
workingFlowFileRecord = flowFile;
|
workingFlowFileRecord = flowFile;
|
||||||
|
this.contentModified |= contentModified;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, String> initializeUpdatedAttributes() {
|
private Map<String, String> initializeUpdatedAttributes() {
|
||||||
|
@ -122,8 +124,9 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
return updatedAttributes;
|
return updatedAttributes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) {
|
public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue, final boolean contentModified) {
|
||||||
workingFlowFileRecord = flowFile;
|
workingFlowFileRecord = flowFile;
|
||||||
|
this.contentModified |= contentModified;
|
||||||
|
|
||||||
// In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them.
|
// In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them.
|
||||||
if (type == RepositoryRecordType.CREATE) {
|
if (type == RepositoryRecordType.CREATE) {
|
||||||
|
@ -137,8 +140,9 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) {
|
public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs, final boolean contentModified) {
|
||||||
workingFlowFileRecord = flowFile;
|
workingFlowFileRecord = flowFile;
|
||||||
|
this.contentModified |= contentModified;
|
||||||
|
|
||||||
// In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them.
|
// In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them.
|
||||||
if (type == RepositoryRecordType.CREATE) {
|
if (type == RepositoryRecordType.CREATE) {
|
||||||
|
@ -183,10 +187,6 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
return transferRelationship;
|
return transferRelationship;
|
||||||
}
|
}
|
||||||
|
|
||||||
FlowFileRecord getWorking() {
|
|
||||||
return workingFlowFileRecord;
|
|
||||||
}
|
|
||||||
|
|
||||||
ContentClaim getWorkingClaim() {
|
ContentClaim getWorkingClaim() {
|
||||||
return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim();
|
return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim();
|
||||||
}
|
}
|
||||||
|
@ -206,10 +206,6 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset();
|
return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset();
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isWorking() {
|
|
||||||
return (workingFlowFileRecord != null);
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, String> getOriginalAttributes() {
|
Map<String, String> getOriginalAttributes() {
|
||||||
return originalAttributes;
|
return originalAttributes;
|
||||||
}
|
}
|
||||||
|
@ -260,4 +256,8 @@ public class StandardRepositoryRecord implements RepositoryRecord {
|
||||||
public long getStartNanos() {
|
public long getStartNanos() {
|
||||||
return startNanos;
|
return startNanos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isContentModified() {
|
||||||
|
return contentModified;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,8 @@ import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestStandardRepositoryRecord {
|
public class TestStandardRepositoryRecord {
|
||||||
|
|
||||||
|
@ -40,12 +42,19 @@ public class TestStandardRepositoryRecord {
|
||||||
.addAttributes(updatedAttributes)
|
.addAttributes(updatedAttributes)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
record.setWorking(flowFileRecord, updatedAttributes);
|
record.setWorking(flowFileRecord, updatedAttributes, false);
|
||||||
|
|
||||||
final Map<String, String> updatedWithId = new HashMap<>(updatedAttributes);
|
final Map<String, String> updatedWithId = new HashMap<>(updatedAttributes);
|
||||||
updatedWithId.put("uuid", uuid);
|
updatedWithId.put("uuid", uuid);
|
||||||
|
|
||||||
assertEquals(updatedWithId, record.getUpdatedAttributes());
|
assertEquals(updatedWithId, record.getUpdatedAttributes());
|
||||||
|
assertFalse(record.isContentModified());
|
||||||
|
|
||||||
|
record.setWorking(flowFileRecord, true);
|
||||||
|
assertTrue(record.isContentModified());
|
||||||
|
|
||||||
|
record.setWorking(flowFileRecord, false);
|
||||||
|
assertTrue(record.isContentModified()); // Should still be true because it was modified previously
|
||||||
|
|
||||||
record.markForDelete();
|
record.markForDelete();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue