diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 12bcafdd2e..ea4969cdb8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1861,12 +1861,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return newFile; } - private void updateLastQueuedDate(final StandardRepositoryRecord record) { + private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) { final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) - .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build(); + .lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build(); record.setWorking(newFile); } + private void updateLastQueuedDate(final StandardRepositoryRecord record) { + updateLastQueuedDate(record, System.currentTimeMillis()); + } + @Override public void transfer(FlowFile flowFile, final Relationship relationship) { verifyTaskActive(); @@ -1938,11 +1942,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final int multiplier = Math.max(1, numDestinations); + final long queuedTime = System.currentTimeMillis(); long contentSize = 0L; for (final FlowFile flowFile : flowFiles) { final StandardRepositoryRecord record = records.get(flowFile); record.setTransferRelationship(relationship); - updateLastQueuedDate(record); + updateLastQueuedDate(record, queuedTime); contentSize += flowFile.getSize(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index f47be4d922..4059557948 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -1650,6 +1651,30 @@ public class TestStandardProcessSession { assertEquals(7, flowFiles.size()); } + @Test + public void testBatchQueuedHaveSameQueuedTime() { + for (int i = 0; i < 100; i++) { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(i) + .addAttribute("uuid", "000000000000-0000-0000-0000-0000000" + i) + .build(); + this.flowFileQueue.put(flowFile); + } + + final List flowFiles = session.get(100); + + // FlowFile Queued times should not match yet + assertNotEquals("Queued times should not be equal.", flowFiles.get(0).getLastQueueDate(), flowFiles.get(99).getLastQueueDate()); + + session.transfer(flowFiles, new Relationship.Builder().name("A").build()); + session.commit(); + + final List flowFilesUpdated = session.get(100); + + // FlowFile Queued times should match + assertEquals("Queued times should be equal.", flowFilesUpdated.get(0).getLastQueueDate(), flowFilesUpdated.get(99).getLastQueueDate()); + } + @Test public void testAttributesModifiedEmitted() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()