From 3b74d2ddad10de9105f38db3f555ec6f024afcd4 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 7 Dec 2017 14:44:10 -0500 Subject: [PATCH] NIFI-4633: This closes #2327. Ensure that everywhere that a FlowFile is passed into ProcessSession that we used the most up-to-date version of it Ensure that when ProcessSession.clone(FlowFile) is called, we obtain the most recent version of the FlowFile before attempting to obtain FlowFile size. Signed-off-by: joewitt --- .../repository/StandardProcessSession.java | 24 +++-- .../TestStandardProcessSession.java | 94 ++++++++++++++++++- 2 files changed, 111 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 0e2e9b5061..c4ea132b50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1168,8 +1168,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE migrate((StandardProcessSession) newOwner, flowFiles); } - private void migrate(final StandardProcessSession newOwner, final Collection flowFiles) { + private void migrate(final StandardProcessSession newOwner, Collection flowFiles) { // We don't call validateRecordState() here because we want to allow migration of FlowFiles that have already been marked as removed or transferred, etc. + flowFiles = flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList()); + for (final FlowFile flowFile : flowFiles) { if (openInputStreams.containsKey(flowFile)) { throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " @@ -1580,7 +1582,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE registerDequeuedRecord(flowFile, connection); } - return new ArrayList(newlySelected); + return new ArrayList<>(newlySelected); } finally { if (lockQueue) { connection.unlock(); @@ -1615,7 +1617,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE registerDequeuedRecord(flowFile, conn); } - return new ArrayList(newlySelected); + return new ArrayList<>(newlySelected); } return new ArrayList<>(); @@ -1658,7 +1660,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile clone(final FlowFile example) { + public FlowFile clone(FlowFile example) { + example = validateRecordState(example); return clone(example, 0L, example.getSize()); } @@ -3098,8 +3101,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return records.containsKey(flowFile); } + private FlowFile getMostRecent(final FlowFile flowFile) { + final StandardRepositoryRecord existingRecord = records.get(flowFile); + return existingRecord == null ? flowFile : existingRecord.getCurrent(); + } + @Override - public FlowFile create(final FlowFile parent) { + public FlowFile create(FlowFile parent) { + parent = getMostRecent(parent); + final Map newAttributes = new HashMap<>(3); newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); @@ -3135,7 +3145,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile create(final Collection parents) { + public FlowFile create(Collection parents) { + parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList()); + final Map newAttributes = intersectAttributes(parents); newAttributes.remove(CoreAttributes.UUID.key()); newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 5a939aecbe..68d13e322e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.repository; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -270,6 +271,80 @@ public class TestStandardProcessSession { verify(conn2, times(1)).poll(any(Set.class)); } + @Test + public void testCloneOriginalDataSmaller() throws IOException { + final byte[] originalContent = "hello".getBytes(); + final byte[] replacementContent = "NEW DATA".getBytes(); + + final Connection conn1 = createConnection(); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(contentRepo.create(originalContent)) + .size(originalContent.length) + .build(); + + flowFileQueue.put(flowFileRecord); + + when(connectable.getIncomingConnections()).thenReturn(Collections.singletonList(conn1)); + + final FlowFile input = session.get(); + assertEquals(originalContent.length, input.getSize()); + + final FlowFile modified = session.write(input, (in, out) -> out.write(replacementContent)); + assertEquals(replacementContent.length, modified.getSize()); + + // Clone 'input', not 'modified' because we want to ensure that we use the outdated reference to ensure + // that the framework uses the most current reference. + final FlowFile clone = session.clone(input); + assertEquals(replacementContent.length, clone.getSize()); + + final byte[] buffer = new byte[replacementContent.length]; + try (final InputStream in = session.read(clone)) { + StreamUtils.fillBuffer(in, buffer); + } + + assertArrayEquals(replacementContent, buffer); + } + + @Test + public void testCloneOriginalDataLarger() throws IOException { + final byte[] originalContent = "hello there 12345".getBytes(); + final byte[] replacementContent = "NEW DATA".getBytes(); + + final Connection conn1 = createConnection(); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(contentRepo.create(originalContent)) + .size(originalContent.length) + .build(); + + flowFileQueue.put(flowFileRecord); + + when(connectable.getIncomingConnections()).thenReturn(Collections.singletonList(conn1)); + + final FlowFile input = session.get(); + assertEquals(originalContent.length, input.getSize()); + + final FlowFile modified = session.write(input, (in, out) -> out.write(replacementContent)); + assertEquals(replacementContent.length, modified.getSize()); + + // Clone 'input', not 'modified' because we want to ensure that we use the outdated reference to ensure + // that the framework uses the most current reference. + final FlowFile clone = session.clone(input); + assertEquals(replacementContent.length, clone.getSize()); + + final byte[] buffer = new byte[replacementContent.length]; + try (final InputStream in = session.read(clone)) { + StreamUtils.fillBuffer(in, buffer); + } + + assertArrayEquals(replacementContent, buffer); + } + @Test @SuppressWarnings("unchecked") public void testRoundRobinOnSessionGetWithCount() { @@ -1909,6 +1984,23 @@ public class TestStandardProcessSession { return contentClaim; } + public ContentClaim create(byte[] content) throws IOException { + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false); + final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); + + claimantCounts.put(contentClaim, new AtomicInteger(1)); + final Path path = getPath(contentClaim); + final Path parent = path.getParent(); + if (Files.exists(parent) == false) { + Files.createDirectories(parent); + } + + try (final OutputStream out = new FileOutputStream(getPath(contentClaim).toFile())) { + out.write(content); + } + return contentClaim; + } + @Override public int incrementClaimaintCount(ContentClaim claim) { AtomicInteger count = claimantCounts.get(claim); @@ -1938,7 +2030,7 @@ public class TestStandardProcessSession { @Override public Set getContainerNames() { - return new HashSet(); + return new HashSet<>(); } @Override