NIFI-4633: This closes #2327. Ensure that everywhere that a FlowFile is passed into ProcessSession that we used the most up-to-date version of it

Ensure that when ProcessSession.clone(FlowFile) is called, we obtain the most recent version of the FlowFile before attempting to obtain FlowFile size.

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-12-07 14:44:10 -05:00 committed by joewitt
parent 113ad5ecfa
commit 3b74d2ddad
2 changed files with 111 additions and 7 deletions

View File

@ -1168,8 +1168,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
migrate((StandardProcessSession) newOwner, flowFiles); migrate((StandardProcessSession) newOwner, flowFiles);
} }
private void migrate(final StandardProcessSession newOwner, final Collection<FlowFile> flowFiles) { private void migrate(final StandardProcessSession newOwner, Collection<FlowFile> flowFiles) {
// We don't call validateRecordState() here because we want to allow migration of FlowFiles that have already been marked as removed or transferred, etc. // We don't call validateRecordState() here because we want to allow migration of FlowFiles that have already been marked as removed or transferred, etc.
flowFiles = flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
if (openInputStreams.containsKey(flowFile)) { if (openInputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
@ -1580,7 +1582,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
registerDequeuedRecord(flowFile, connection); registerDequeuedRecord(flowFile, connection);
} }
return new ArrayList<FlowFile>(newlySelected); return new ArrayList<>(newlySelected);
} finally { } finally {
if (lockQueue) { if (lockQueue) {
connection.unlock(); connection.unlock();
@ -1615,7 +1617,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
registerDequeuedRecord(flowFile, conn); registerDequeuedRecord(flowFile, conn);
} }
return new ArrayList<FlowFile>(newlySelected); return new ArrayList<>(newlySelected);
} }
return new ArrayList<>(); return new ArrayList<>();
@ -1658,7 +1660,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
@Override @Override
public FlowFile clone(final FlowFile example) { public FlowFile clone(FlowFile example) {
example = validateRecordState(example);
return clone(example, 0L, example.getSize()); return clone(example, 0L, example.getSize());
} }
@ -3098,8 +3101,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return records.containsKey(flowFile); return records.containsKey(flowFile);
} }
private FlowFile getMostRecent(final FlowFile flowFile) {
final StandardRepositoryRecord existingRecord = records.get(flowFile);
return existingRecord == null ? flowFile : existingRecord.getCurrent();
}
@Override @Override
public FlowFile create(final FlowFile parent) { public FlowFile create(FlowFile parent) {
parent = getMostRecent(parent);
final Map<String, String> newAttributes = new HashMap<>(3); final Map<String, String> newAttributes = new HashMap<>(3);
newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
@ -3135,7 +3145,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
@Override @Override
public FlowFile create(final Collection<FlowFile> parents) { public FlowFile create(Collection<FlowFile> parents) {
parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
final Map<String, String> newAttributes = intersectAttributes(parents); final Map<String, String> newAttributes = intersectAttributes(parents);
newAttributes.remove(CoreAttributes.UUID.key()); newAttributes.remove(CoreAttributes.UUID.key());
newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key()); newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -270,6 +271,80 @@ public class TestStandardProcessSession {
verify(conn2, times(1)).poll(any(Set.class)); verify(conn2, times(1)).poll(any(Set.class));
} }
@Test
public void testCloneOriginalDataSmaller() throws IOException {
final byte[] originalContent = "hello".getBytes();
final byte[] replacementContent = "NEW DATA".getBytes();
final Connection conn1 = createConnection();
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(contentRepo.create(originalContent))
.size(originalContent.length)
.build();
flowFileQueue.put(flowFileRecord);
when(connectable.getIncomingConnections()).thenReturn(Collections.singletonList(conn1));
final FlowFile input = session.get();
assertEquals(originalContent.length, input.getSize());
final FlowFile modified = session.write(input, (in, out) -> out.write(replacementContent));
assertEquals(replacementContent.length, modified.getSize());
// Clone 'input', not 'modified' because we want to ensure that we use the outdated reference to ensure
// that the framework uses the most current reference.
final FlowFile clone = session.clone(input);
assertEquals(replacementContent.length, clone.getSize());
final byte[] buffer = new byte[replacementContent.length];
try (final InputStream in = session.read(clone)) {
StreamUtils.fillBuffer(in, buffer);
}
assertArrayEquals(replacementContent, buffer);
}
@Test
public void testCloneOriginalDataLarger() throws IOException {
final byte[] originalContent = "hello there 12345".getBytes();
final byte[] replacementContent = "NEW DATA".getBytes();
final Connection conn1 = createConnection();
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(contentRepo.create(originalContent))
.size(originalContent.length)
.build();
flowFileQueue.put(flowFileRecord);
when(connectable.getIncomingConnections()).thenReturn(Collections.singletonList(conn1));
final FlowFile input = session.get();
assertEquals(originalContent.length, input.getSize());
final FlowFile modified = session.write(input, (in, out) -> out.write(replacementContent));
assertEquals(replacementContent.length, modified.getSize());
// Clone 'input', not 'modified' because we want to ensure that we use the outdated reference to ensure
// that the framework uses the most current reference.
final FlowFile clone = session.clone(input);
assertEquals(replacementContent.length, clone.getSize());
final byte[] buffer = new byte[replacementContent.length];
try (final InputStream in = session.read(clone)) {
StreamUtils.fillBuffer(in, buffer);
}
assertArrayEquals(replacementContent, buffer);
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testRoundRobinOnSessionGetWithCount() { public void testRoundRobinOnSessionGetWithCount() {
@ -1909,6 +1984,23 @@ public class TestStandardProcessSession {
return contentClaim; return contentClaim;
} }
public ContentClaim create(byte[] content) throws IOException {
final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
claimantCounts.put(contentClaim, new AtomicInteger(1));
final Path path = getPath(contentClaim);
final Path parent = path.getParent();
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
try (final OutputStream out = new FileOutputStream(getPath(contentClaim).toFile())) {
out.write(content);
}
return contentClaim;
}
@Override @Override
public int incrementClaimaintCount(ContentClaim claim) { public int incrementClaimaintCount(ContentClaim claim) {
AtomicInteger count = claimantCounts.get(claim); AtomicInteger count = claimantCounts.get(claim);
@ -1938,7 +2030,7 @@ public class TestStandardProcessSession {
@Override @Override
public Set<String> getContainerNames() { public Set<String> getContainerNames() {
return new HashSet<String>(); return new HashSet<>();
} }
@Override @Override