NIFI-5384 FlowFile's queued in batches should all have the same Queue time

This closes #2849

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
patricker 2018-07-06 10:38:04 -06:00 committed by Mike Thomsen
parent 0c5e159ebc
commit d50e3f1747
2 changed files with 33 additions and 3 deletions

View File

@ -1861,12 +1861,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile; return newFile;
} }
private void updateLastQueuedDate(final StandardRepositoryRecord record) { private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) {
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
.lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build(); .lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build();
record.setWorking(newFile); record.setWorking(newFile);
} }
private void updateLastQueuedDate(final StandardRepositoryRecord record) {
updateLastQueuedDate(record, System.currentTimeMillis());
}
@Override @Override
public void transfer(FlowFile flowFile, final Relationship relationship) { public void transfer(FlowFile flowFile, final Relationship relationship) {
verifyTaskActive(); verifyTaskActive();
@ -1938,11 +1942,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final int multiplier = Math.max(1, numDestinations); final int multiplier = Math.max(1, numDestinations);
final long queuedTime = System.currentTimeMillis();
long contentSize = 0L; long contentSize = 0L;
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile); final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship); record.setTransferRelationship(relationship);
updateLastQueuedDate(record); updateLastQueuedDate(record, queuedTime);
contentSize += flowFile.getSize(); contentSize += flowFile.getSize();
} }

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -1650,6 +1651,30 @@ public class TestStandardProcessSession {
assertEquals(7, flowFiles.size()); assertEquals(7, flowFiles.size());
} }
@Test
public void testBatchQueuedHaveSameQueuedTime() {
for (int i = 0; i < 100; i++) {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
.id(i)
.addAttribute("uuid", "000000000000-0000-0000-0000-0000000" + i)
.build();
this.flowFileQueue.put(flowFile);
}
final List<FlowFile> flowFiles = session.get(100);
// FlowFile Queued times should not match yet
assertNotEquals("Queued times should not be equal.", flowFiles.get(0).getLastQueueDate(), flowFiles.get(99).getLastQueueDate());
session.transfer(flowFiles, new Relationship.Builder().name("A").build());
session.commit();
final List<FlowFile> flowFilesUpdated = session.get(100);
// FlowFile Queued times should match
assertEquals("Queued times should be equal.", flowFilesUpdated.get(0).getLastQueueDate(), flowFilesUpdated.get(99).getLastQueueDate());
}
@Test @Test
public void testAttributesModifiedEmitted() throws IOException { public void testAttributesModifiedEmitted() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()