From 32b8a9b9f4c07b4f5d3049c33880b547481cbb24 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 5 Jul 2016 14:56:22 -0400 Subject: [PATCH] NIFI-1877, NIFI-1306: Add fields to FlowFile for FIFO Prioritizer, Oldest/Newest FlowFile first prioritizers to work properly This closes #546 --- .../org/apache/nifi/flowfile/FlowFile.java | 26 +++++++++++ .../org/apache/nifi/util/MockFlowFile.java | 10 +++++ .../controller/FileSystemSwapManager.java | 27 +++++++++-- .../nifi/controller/FlowController.java | 2 +- .../repository/StandardFlowFileRecord.java | 25 ++++++++++- .../repository/StandardProcessSession.java | 23 +++++++++- .../WriteAheadFlowFileRepository.java | 45 ++++++++++++++++--- .../controller/TestFileSystemSwapManager.java | 10 +++++ .../controller/TestStandardFlowFileQueue.java | 10 +++++ .../org/apache/nifi/provenance/TestUtil.java | 10 +++++ .../TestVolatileProvenanceRepository.java | 10 +++++ .../FirstInFirstOutPrioritizer.java | 7 ++- .../NewestFlowFileFirstPrioritizer.java | 2 +- .../OldestFlowFileFirstPrioritizer.java | 2 +- 14 files changed, 193 insertions(+), 16 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java index 1288d212fb..43fbf20054 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -52,6 +52,19 @@ public interface FlowFile extends Comparable { */ long getLineageStartDate(); + /** + * Returns a 64-bit integer that indicates the order in which the FlowFile was added to the + * flow with respect to other FlowFiles that have the same last lineage start date. + * I.e., if two FlowFiles return the same value for {@link #getLineageStartDate()}, the order + * in which those FlowFiles were added to the flow can be determined by looking at the result of + * this method. However, no guarantee is made by this method about the ordering of FlowFiles + * that have different values for the {@link #getLineageStartDate()} method. + * + * @return the index that can be used to compare two FlowFiles with the same lineage start date + * to understand the order in which the two FlowFiles were enqueued. + */ + long getLineageStartIndex(); + /** * @return the time at which the FlowFile was most recently added to a * FlowFile queue, or {@code null} if the FlowFile has never been enqueued. @@ -60,6 +73,19 @@ public interface FlowFile extends Comparable { */ Long getLastQueueDate(); + /** + * Returns a 64-bit integer that indicates the order in which the FlowFile was added to the + * FlowFile queue with respect to other FlowFiles that have the same last queue date. + * I.e., if two FlowFiles return the same value for {@link #getLastQueueDate()}, the order + * in which those FlowFiles were enqueued can be determined by looking at the result of + * this method. However, no guarantee is made by this method about the ordering of FlowFiles + * that have different values for the {@link #getLastQueueDate()} method. + * + * @return the index that can be used to compare two FlowFiles with the same last queue date + * to understand the order in which the two FlowFiles were enqueued. + */ + long getQueueDateIndex(); + /** *

* If a FlowFile is derived from multiple "parent" FlowFiles, all of the diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index 41bcc741ae..049c65b052 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -290,4 +290,14 @@ public class MockFlowFile implements FlowFileRecord { public long getContentClaimOffset() { return 0; } + + @Override + public long getLineageStartIndex() { + return 0; + } + + @Override + public long getQueueDateIndex() { + return 0; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 156389bbe8..5234f0ea95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -74,7 +74,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part"); - public static final int SWAP_ENCODING_VERSION = 8; + public static final int SWAP_ENCODING_VERSION = 9; public static final String EVENT_CATEGORY = "Swap FlowFiles"; private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); @@ -328,7 +328,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } out.writeLong(flowFile.getLineageStartDate()); + out.writeLong(flowFile.getLineageStartIndex()); out.writeLong(flowFile.getLastQueueDate()); + out.writeLong(flowFile.getQueueDateIndex()); out.writeLong(flowFile.getSize()); final ContentClaim claim = flowFile.getContentClaim(); @@ -447,10 +449,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager { lineageIdentifiers.add(in.readUTF()); } ffBuilder.lineageIdentifiers(lineageIdentifiers); - ffBuilder.lineageStartDate(in.readLong()); + + // version 9 adds in a 'lineage start index' + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (serializationVersion > 8) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); if (serializationVersion > 5) { - ffBuilder.lastQueueDate(in.readLong()); + // Version 9 adds in a 'queue date index' + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (serializationVersion > 8) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 3d2eca2936..8c710fa064 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3671,7 +3671,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R .entryDate(System.currentTimeMillis()) .id(flowFileRepository.getNextFlowFileSequence()) .lineageIdentifiers(lineageIdentifiers) - .lineageStartDate(event.getLineageStartDate()) + .lineageStart(event.getLineageStartDate(), 0L) .size(contentSize.longValue()) // Create a new UUID and add attributes indicating that this is a replay .addAttribute("flowfile.replay", "true") diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java index d3d8d40117..cd1ba14109 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -48,6 +48,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private final long id; private final long entryDate; private final long lineageStartDate; + private final long lineageStartIndex; private final Set lineageIdentifiers; private final long size; private final long penaltyExpirationMs; @@ -55,18 +56,21 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private final ContentClaim claim; private final long claimOffset; private final long lastQueueDate; + private final long queueDateIndex; private StandardFlowFileRecord(final Builder builder) { this.id = builder.bId; this.attributes = builder.bAttributes; this.entryDate = builder.bEntryDate; this.lineageStartDate = builder.bLineageStartDate; + this.lineageStartIndex = builder.bLineageStartIndex; this.lineageIdentifiers = builder.bLineageIdentifiers; this.penaltyExpirationMs = builder.bPenaltyExpirationMs; this.size = builder.bSize; this.claim = builder.bClaim; this.claimOffset = builder.bClaimOffset; this.lastQueueDate = builder.bLastQueueDate; + this.queueDateIndex = builder.bQueueDateIndex; } @Override @@ -124,6 +128,16 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { return this.claimOffset; } + @Override + public long getLineageStartIndex() { + return lineageStartIndex; + } + + @Override + public long getQueueDateIndex() { + return queueDateIndex; + } + /** * Provides the natural ordering for FlowFile objects which is based on their identifier. * @@ -167,6 +181,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private long bId; private long bEntryDate = System.currentTimeMillis(); private long bLineageStartDate = bEntryDate; + private long bLineageStartIndex = 0L; private final Set bLineageIdentifiers = new HashSet<>(); private long bPenaltyExpirationMs = -1L; private long bSize = 0L; @@ -174,6 +189,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private ContentClaim bClaim = null; private long bClaimOffset = 0L; private long bLastQueueDate = System.currentTimeMillis(); + private long bQueueDateIndex = 0L; public Builder id(final long id) { bId = id; @@ -204,8 +220,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { return this; } - public Builder lineageStartDate(final long lineageStartDate) { + public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) { bLineageStartDate = lineageStartDate; + bLineageStartIndex = lineageStartIndex; return this; } @@ -298,8 +315,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { return this; } - public Builder lastQueueDate(final long lastQueueDate) { + public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) { this.bLastQueueDate = lastQueueDate; + this.bQueueDateIndex = queueDateIndex; return this; } @@ -310,6 +328,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { bId = specFlowFile.getId(); bEntryDate = specFlowFile.getEntryDate(); bLineageStartDate = specFlowFile.getLineageStartDate(); + bLineageStartIndex = specFlowFile.getLineageStartIndex(); bLineageIdentifiers.clear(); bLineageIdentifiers.addAll(specFlowFile.getLineageIdentifiers()); bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis(); @@ -317,6 +336,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { bAttributes.putAll(specFlowFile.getAttributes()); bClaim = specFlowFile.getContentClaim(); bClaimOffset = specFlowFile.getContentClaimOffset(); + bLastQueueDate = specFlowFile.getLastQueueDate(); + bQueueDateIndex = specFlowFile.getQueueDateIndex(); return this; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 062e515906..f6eca716fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -92,6 +92,7 @@ import org.slf4j.LoggerFactory; public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher { private static final AtomicLong idGenerator = new AtomicLong(0L); + private static final AtomicLong enqueuedIndex = new AtomicLong(0L); // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback public static final int VERBOSE_LOG_THRESHOLD = 10; @@ -1526,11 +1527,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return newFile; } + private void updateLastQueuedDate(final StandardRepositoryRecord record) { + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) + .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build(); + record.setWorking(newFile); + } + @Override public void transfer(final FlowFile flowFile, final Relationship relationship) { validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); record.setTransferRelationship(relationship); + updateLastQueuedDate(record); final int numDestinations = context.getConnections(relationship).size(); final int multiplier = Math.max(1, numDestinations); @@ -1560,6 +1568,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self"); } record.setTransferRelationship(Relationship.SELF); + updateLastQueuedDate(record); } @Override @@ -1589,6 +1598,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (final FlowFile flowFile : flowFiles) { final StandardRepositoryRecord record = records.get(flowFile); record.setTransferRelationship(relationship); + updateLastQueuedDate(record); + contentSize += flowFile.getSize() * multiplier; } @@ -2482,7 +2493,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Set lineageIdentifiers = new HashSet<>(parent.getLineageIdentifiers()); lineageIdentifiers.add(parent.getAttribute(CoreAttributes.UUID.key())); fFileBuilder.lineageIdentifiers(lineageIdentifiers); - fFileBuilder.lineageStartDate(parent.getLineageStartDate()); + fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex()); fFileBuilder.addAttributes(newAttributes); final FlowFileRecord fFile = fFileBuilder.build(); @@ -2516,6 +2527,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + // find the smallest lineage start index that has the same lineage start date as the one we've chosen. + long lineageStartIndex = 0L; + for (final FlowFile parent : parents) { + if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) { + lineageStartIndex = parent.getLineageStartIndex(); + } + } + newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); @@ -2523,7 +2542,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) .addAttributes(newAttributes) .lineageIdentifiers(lineageIdentifiers) - .lineageStartDate(lineageStartDate) + .lineageStart(lineageStartDate, lineageStartIndex) .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 211baa70ef..0f40cbb612 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -388,6 +388,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } private static class WriteAheadRecordSerde implements SerDe { + private static final int CURRENT_ENCODING_VERSION = 8; public static final byte ACTION_CREATE = 0; public static final byte ACTION_UPDATE = 1; @@ -474,9 +475,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } out.writeLong(flowFile.getLineageStartDate()); + out.writeLong(flowFile.getLineageStartIndex()); final Long queueDate = flowFile.getLastQueueDate(); out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); + out.writeLong(flowFile.getQueueDateIndex()); out.writeLong(flowFile.getSize()); if (associatedQueue == null) { @@ -552,10 +555,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis lineageIdentifiers.add(in.readUTF()); } ffBuilder.lineageIdentifiers(lineageIdentifiers); - ffBuilder.lineageStartDate(in.readLong()); + + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); if (version > 5) { - ffBuilder.lastQueueDate(in.readLong()); + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); } } @@ -648,10 +667,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis lineageIdentifiers.add(in.readUTF()); } ffBuilder.lineageIdentifiers(lineageIdentifiers); - ffBuilder.lineageStartDate(in.readLong()); + + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); if (version > 5) { - ffBuilder.lastQueueDate(in.readLong()); + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); } } @@ -872,7 +907,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public int getVersion() { - return 7; + return CURRENT_ENCODING_VERSION; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index fcfd524ef4..c27fad3b23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -242,5 +242,15 @@ public class TestFileSystemSwapManager { public long getContentClaimOffset() { return 0; } + + @Override + public long getLineageStartIndex() { + return 0; + } + + @Override + public long getQueueDateIndex() { + return 0; + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 18c55c646d..85d983800e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -692,6 +692,16 @@ public class TestStandardFlowFileQueue { public long getContentClaimOffset() { return 0; } + + @Override + public long getLineageStartIndex() { + return 0; + } + + @Override + public long getQueueDateIndex() { + return 0; + } } private static class FlowFileSizePrioritizer implements FlowFilePrioritizer { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java index 26766d68e7..a34d78b861 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -77,6 +77,16 @@ public class TestUtil { public int compareTo(final FlowFile o) { return 0; } + + @Override + public long getLineageStartIndex() { + return 0; + } + + @Override + public long getQueueDateIndex() { + return 0; + } }; } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java index 7a7a334a2c..4daa31cc47 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java @@ -172,6 +172,16 @@ public class TestVolatileProvenanceRepository { public Long getLastQueueDate() { return System.currentTimeMillis(); } + + @Override + public long getLineageStartIndex() { + return 0; + } + + @Override + public long getQueueDateIndex() { + return 0; + } }; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java index 6107e87bc6..08437c76ba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java @@ -31,7 +31,12 @@ public class FirstInFirstOutPrioritizer implements FlowFilePrioritizer { return 1; } - return o1.getLastQueueDate().compareTo(o2.getLastQueueDate()); + final int dateComparison = o1.getLastQueueDate().compareTo(o2.getLastQueueDate()); + if (dateComparison != 0) { + return dateComparison; + } + + return Long.compare(o1.getQueueDateIndex(), o2.getQueueDateIndex()); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java index 515993ea42..4893cf0589 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java @@ -36,7 +36,7 @@ public class NewestFlowFileFirstPrioritizer implements FlowFilePrioritizer { return lineageDateCompare; } - return Long.compare(o2.getId(), o1.getId()); + return Long.compare(o2.getLineageStartIndex(), o1.getLineageStartIndex()); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java index b4781f3051..386d91238b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java @@ -36,7 +36,7 @@ public class OldestFlowFileFirstPrioritizer implements FlowFilePrioritizer { return lineageDateCompare; } - return Long.compare(o1.getId(), o2.getId()); + return Long.compare(o1.getLineageStartIndex(), o2.getLineageStartIndex()); } }