NIFI-8727: This closes #5418. Addressed bug in which ProcessSession doesn't properly decrement claimant count when a FlowFile is cloned and then the clone written to. Added automated tests to ensure that we are properly handling cases where a FlowFile is clone and then the contents modified

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2021-09-28 13:23:44 -04:00 committed by Joe Witt
parent 6ef638d13d
commit efc1cb012f
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 210 additions and 4 deletions

View File

@ -3075,7 +3075,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// If the content of the FlowFile has already been modified, we need to remove the newly created content (the working claim). However, if
// they are the same, we cannot just remove the claim because record.getWorkingClaim() will return
// the original claim if the record is "working" but the content has not been modified
// (e.g., in the case of attributes only were updated)
// (e.g., in the case of attributes only were updated).
//
// In other words:
// If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
@ -3083,7 +3083,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
// because we will do that later, in the session.commit() and that would result in decrementing the count for
// the original claim twice.
if (record.isContentModified()) {
//
// Additionally, If the Record Type is CREATE, that means any content that exists is a temporary claim.
// This will happen, for example, if a FlowFile is created and then written to multiple times or a FlowFile is cloned and then the clone's contents
// are overwritten. In such a case, we can confidently decrement the count/remove the claim because either the claim is null (in which case calling
// decrementClaimantCount/addTransientClaim will have no effect, or the claim points to the previously written content that is no longer relevant because
// the FlowFile's content is being overwritten.
if (record.isContentModified() || record.getType() == RepositoryRecordType.CREATE) {
// In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
// Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that

View File

@ -111,6 +111,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class StandardProcessSessionIT {
private static final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private StandardProcessSession session;
private MockContentRepository contentRepo;
@ -123,8 +124,7 @@ public class StandardProcessSessionIT {
private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository;
private FlowFileEventRepository flowFileEventRepository;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager;
private ResourceClaimManager resourceClaimManager;
@After
public void cleanup() {
@ -2603,6 +2603,205 @@ public class StandardProcessSessionIT {
stateManager.assertStateNotSet();
}
@Test
public void testCloneThenRollbackCountsClaimReferencesProperly() throws IOException {
final ContentClaim originalClaim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(originalClaim)) {
out.write("hello, world".getBytes());
}
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(originalClaim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
FlowFile clone = session.clone(flowFile);
session.rollback();
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
}
@Test
public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly() throws IOException {
final ContentClaim originalClaim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(originalClaim)) {
out.write("hello, world".getBytes());
}
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(originalClaim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
FlowFile clone = session.clone(flowFile);
clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
session.rollback();
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
}
@Test
public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly() throws IOException {
final ContentClaim originalClaim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(originalClaim)) {
out.write("hello, world".getBytes());
}
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(originalClaim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
FlowFile clone = session.clone(flowFile);
clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
session.rollback();
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
}
@Test
public void testCloneThenWriteCountsClaimReferencesProperly() throws IOException {
final ContentClaim originalClaim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(originalClaim)) {
out.write("hello, world".getBytes());
}
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(originalClaim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
FlowFile clone = session.clone(flowFile);
// Expect claimant count of 2 because the clone() means that the new FlowFile points to the same content claim.
assertEquals(2, contentRepo.getClaimantCount(originalClaim));
// Should be able to write to the FlowFile any number of times, and each time it should leave us with a Content Claim Claimant Count of 1 for the original (because the new FlowFile will no
// longer point at the original claim) and 1 for the new Content Claim.
for (int i=0; i < 10; i++) {
final ContentClaim previousCloneClaim = getContentClaim(clone);
clone = session.write(clone, out -> out.write("bye".getBytes()));
// After modifying the content of the FlowFile, the claimant count of the 'old' content claim should be 1, as should the claimant count of the updated content claim.
final ContentClaim updatedCloneClaim = getContentClaim(clone);
assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
assertEquals(1, contentRepo.getClaimantCount(originalClaim));
assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
}
}
private ContentClaim getContentClaim(final FlowFile flowFile) {
return ((FlowFileRecord) flowFile).getContentClaim();
}
@Test
public void testCreateChildThenWriteCountsClaimReferencesProperly() throws IOException {
final ContentClaim claim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(claim)) {
out.write("hello, world".getBytes());
}
assertEquals(1, contentRepo.getClaimantCount(claim));
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
FlowFile clone = session.create(flowFile);
assertEquals(1, contentRepo.getClaimantCount(claim));
clone = session.write(clone, out -> out.write("bye".getBytes()));
final ContentClaim updatedCloneClaim = getContentClaim(clone);
assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
assertEquals(1, contentRepo.getClaimantCount(claim));
}
@Test
public void testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws IOException {
final ContentClaim claim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(claim)) {
out.write("hello, world".getBytes());
}
assertEquals(1, contentRepo.getClaimantCount(claim));
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
FlowFile clone = session.create(flowFile);
assertEquals(1, contentRepo.getClaimantCount(claim));
for (int i=0; i < 100; i++) {
clone = session.write(clone, out -> out.write("bye".getBytes()));
final ContentClaim updatedCloneClaim = getContentClaim(clone);
assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
assertEquals(1, contentRepo.getClaimantCount(claim));
}
}
@Test
public void testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly() {
FlowFile flowFile = session.create();
for (int i=0; i < 100; i++) {
flowFile = session.write(flowFile, out -> out.write("bye".getBytes()));
final ContentClaim updatedCloneClaim = getContentClaim(flowFile);
assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
}
session.rollback();
assertEquals(0, contentRepo.getClaimantCount(getContentClaim(flowFile)));
}
private static class MockFlowFileRepository implements FlowFileRepository {
private boolean failOnUpdate = false;