NIFI-1877, NIFI-1306: Add fields to FlowFile for FIFO Prioritizer, Oldest/Newest FlowFile first prioritizers to work properly

This closes #546
This commit is contained in:
Mark Payne 2016-07-05 14:56:22 -04:00 committed by Matt Burgess
parent 7e63b00364
commit 32b8a9b9f4
14 changed files with 193 additions and 16 deletions

View File

@ -52,6 +52,19 @@ public interface FlowFile extends Comparable<FlowFile> {
*/
long getLineageStartDate();
/**
* Returns a 64-bit integer that indicates the order in which the FlowFile was added to the
* flow with respect to other FlowFiles that have the same last lineage start date.
* I.e., if two FlowFiles return the same value for {@link #getLineageStartDate()}, the order
* in which those FlowFiles were added to the flow can be determined by looking at the result of
* this method. However, no guarantee is made by this method about the ordering of FlowFiles
* that have different values for the {@link #getLineageStartDate()} method.
*
* @return the index that can be used to compare two FlowFiles with the same lineage start date
* to understand the order in which the two FlowFiles were enqueued.
*/
long getLineageStartIndex();
/**
* @return the time at which the FlowFile was most recently added to a
* FlowFile queue, or {@code null} if the FlowFile has never been enqueued.
@ -60,6 +73,19 @@ public interface FlowFile extends Comparable<FlowFile> {
*/
Long getLastQueueDate();
/**
* Returns a 64-bit integer that indicates the order in which the FlowFile was added to the
* FlowFile queue with respect to other FlowFiles that have the same last queue date.
* I.e., if two FlowFiles return the same value for {@link #getLastQueueDate()}, the order
* in which those FlowFiles were enqueued can be determined by looking at the result of
* this method. However, no guarantee is made by this method about the ordering of FlowFiles
* that have different values for the {@link #getLastQueueDate()} method.
*
* @return the index that can be used to compare two FlowFiles with the same last queue date
* to understand the order in which the two FlowFiles were enqueued.
*/
long getQueueDateIndex();
/**
* <p>
* If a FlowFile is derived from multiple "parent" FlowFiles, all of the

View File

@ -290,4 +290,14 @@ public class MockFlowFile implements FlowFileRecord {
public long getContentClaimOffset() {
return 0;
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return 0;
}
}

View File

@ -74,7 +74,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
public static final int SWAP_ENCODING_VERSION = 8;
public static final int SWAP_ENCODING_VERSION = 9;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
@ -328,7 +328,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
out.writeLong(flowFile.getLineageStartDate());
out.writeLong(flowFile.getLineageStartIndex());
out.writeLong(flowFile.getLastQueueDate());
out.writeLong(flowFile.getQueueDateIndex());
out.writeLong(flowFile.getSize());
final ContentClaim claim = flowFile.getContentClaim();
@ -447,10 +449,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
ffBuilder.lineageStartDate(in.readLong());
// version 9 adds in a 'lineage start index'
final long lineageStartDate = in.readLong();
final long lineageStartIndex;
if (serializationVersion > 8) {
lineageStartIndex = in.readLong();
} else {
lineageStartIndex = 0L;
}
ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
if (serializationVersion > 5) {
ffBuilder.lastQueueDate(in.readLong());
// Version 9 adds in a 'queue date index'
final long lastQueueDate = in.readLong();
final long queueDateIndex;
if (serializationVersion > 8) {
queueDateIndex = in.readLong();
} else {
queueDateIndex = 0L;
}
ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
}
}

View File

@ -3671,7 +3671,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.entryDate(System.currentTimeMillis())
.id(flowFileRepository.getNextFlowFileSequence())
.lineageIdentifiers(lineageIdentifiers)
.lineageStartDate(event.getLineageStartDate())
.lineageStart(event.getLineageStartDate(), 0L)
.size(contentSize.longValue())
// Create a new UUID and add attributes indicating that this is a replay
.addAttribute("flowfile.replay", "true")

View File

@ -48,6 +48,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final long id;
private final long entryDate;
private final long lineageStartDate;
private final long lineageStartIndex;
private final Set<String> lineageIdentifiers;
private final long size;
private final long penaltyExpirationMs;
@ -55,18 +56,21 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final ContentClaim claim;
private final long claimOffset;
private final long lastQueueDate;
private final long queueDateIndex;
private StandardFlowFileRecord(final Builder builder) {
this.id = builder.bId;
this.attributes = builder.bAttributes;
this.entryDate = builder.bEntryDate;
this.lineageStartDate = builder.bLineageStartDate;
this.lineageStartIndex = builder.bLineageStartIndex;
this.lineageIdentifiers = builder.bLineageIdentifiers;
this.penaltyExpirationMs = builder.bPenaltyExpirationMs;
this.size = builder.bSize;
this.claim = builder.bClaim;
this.claimOffset = builder.bClaimOffset;
this.lastQueueDate = builder.bLastQueueDate;
this.queueDateIndex = builder.bQueueDateIndex;
}
@Override
@ -124,6 +128,16 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this.claimOffset;
}
@Override
public long getLineageStartIndex() {
return lineageStartIndex;
}
@Override
public long getQueueDateIndex() {
return queueDateIndex;
}
/**
* Provides the natural ordering for FlowFile objects which is based on their identifier.
*
@ -167,6 +181,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private long bId;
private long bEntryDate = System.currentTimeMillis();
private long bLineageStartDate = bEntryDate;
private long bLineageStartIndex = 0L;
private final Set<String> bLineageIdentifiers = new HashSet<>();
private long bPenaltyExpirationMs = -1L;
private long bSize = 0L;
@ -174,6 +189,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private ContentClaim bClaim = null;
private long bClaimOffset = 0L;
private long bLastQueueDate = System.currentTimeMillis();
private long bQueueDateIndex = 0L;
public Builder id(final long id) {
bId = id;
@ -204,8 +220,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this;
}
public Builder lineageStartDate(final long lineageStartDate) {
public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) {
bLineageStartDate = lineageStartDate;
bLineageStartIndex = lineageStartIndex;
return this;
}
@ -298,8 +315,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this;
}
public Builder lastQueueDate(final long lastQueueDate) {
public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) {
this.bLastQueueDate = lastQueueDate;
this.bQueueDateIndex = queueDateIndex;
return this;
}
@ -310,6 +328,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
bId = specFlowFile.getId();
bEntryDate = specFlowFile.getEntryDate();
bLineageStartDate = specFlowFile.getLineageStartDate();
bLineageStartIndex = specFlowFile.getLineageStartIndex();
bLineageIdentifiers.clear();
bLineageIdentifiers.addAll(specFlowFile.getLineageIdentifiers());
bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
@ -317,6 +336,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
bAttributes.putAll(specFlowFile.getAttributes());
bClaim = specFlowFile.getContentClaim();
bClaimOffset = specFlowFile.getContentClaimOffset();
bLastQueueDate = specFlowFile.getLastQueueDate();
bQueueDateIndex = specFlowFile.getQueueDateIndex();
return this;
}

View File

@ -92,6 +92,7 @@ import org.slf4j.LoggerFactory;
public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
private static final AtomicLong idGenerator = new AtomicLong(0L);
private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10;
@ -1526,11 +1527,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
private void updateLastQueuedDate(final StandardRepositoryRecord record) {
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
.lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build();
record.setWorking(newFile);
}
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
@ -1560,6 +1568,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
}
record.setTransferRelationship(Relationship.SELF);
updateLastQueuedDate(record);
}
@Override
@ -1589,6 +1598,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);
contentSize += flowFile.getSize() * multiplier;
}
@ -2482,7 +2493,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Set<String> lineageIdentifiers = new HashSet<>(parent.getLineageIdentifiers());
lineageIdentifiers.add(parent.getAttribute(CoreAttributes.UUID.key()));
fFileBuilder.lineageIdentifiers(lineageIdentifiers);
fFileBuilder.lineageStartDate(parent.getLineageStartDate());
fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
fFileBuilder.addAttributes(newAttributes);
final FlowFileRecord fFile = fFileBuilder.build();
@ -2516,6 +2527,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
// find the smallest lineage start index that has the same lineage start date as the one we've chosen.
long lineageStartIndex = 0L;
for (final FlowFile parent : parents) {
if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
lineageStartIndex = parent.getLineageStartIndex();
}
}
newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
@ -2523,7 +2542,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers)
.lineageStartDate(lineageStartDate)
.lineageStart(lineageStartDate, lineageStartIndex)
.build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);

View File

@ -388,6 +388,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> {
private static final int CURRENT_ENCODING_VERSION = 8;
public static final byte ACTION_CREATE = 0;
public static final byte ACTION_UPDATE = 1;
@ -474,9 +475,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
out.writeLong(flowFile.getLineageStartDate());
out.writeLong(flowFile.getLineageStartIndex());
final Long queueDate = flowFile.getLastQueueDate();
out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
out.writeLong(flowFile.getQueueDateIndex());
out.writeLong(flowFile.getSize());
if (associatedQueue == null) {
@ -552,10 +555,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
ffBuilder.lineageStartDate(in.readLong());
final long lineageStartDate = in.readLong();
final long lineageStartIndex;
if (version > 7) {
lineageStartIndex = in.readLong();
} else {
lineageStartIndex = 0L;
}
ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
if (version > 5) {
ffBuilder.lastQueueDate(in.readLong());
final long lastQueueDate = in.readLong();
final long queueDateIndex;
if (version > 7) {
queueDateIndex = in.readLong();
} else {
queueDateIndex = 0L;
}
ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
}
}
@ -648,10 +667,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
ffBuilder.lineageStartDate(in.readLong());
final long lineageStartDate = in.readLong();
final long lineageStartIndex;
if (version > 7) {
lineageStartIndex = in.readLong();
} else {
lineageStartIndex = 0L;
}
ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
if (version > 5) {
ffBuilder.lastQueueDate(in.readLong());
final long lastQueueDate = in.readLong();
final long queueDateIndex;
if (version > 7) {
queueDateIndex = in.readLong();
} else {
queueDateIndex = 0L;
}
ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
}
}
@ -872,7 +907,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
@Override
public int getVersion() {
return 7;
return CURRENT_ENCODING_VERSION;
}
@Override

View File

@ -242,5 +242,15 @@ public class TestFileSystemSwapManager {
public long getContentClaimOffset() {
return 0;
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return 0;
}
}
}

View File

@ -692,6 +692,16 @@ public class TestStandardFlowFileQueue {
public long getContentClaimOffset() {
return 0;
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return 0;
}
}
private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {

View File

@ -77,6 +77,16 @@ public class TestUtil {
public int compareTo(final FlowFile o) {
return 0;
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return 0;
}
};
}
}

View File

@ -172,6 +172,16 @@ public class TestVolatileProvenanceRepository {
public Long getLastQueueDate() {
return System.currentTimeMillis();
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return 0;
}
};
}

View File

@ -31,7 +31,12 @@ public class FirstInFirstOutPrioritizer implements FlowFilePrioritizer {
return 1;
}
return o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
final int dateComparison = o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
if (dateComparison != 0) {
return dateComparison;
}
return Long.compare(o1.getQueueDateIndex(), o2.getQueueDateIndex());
}
}

View File

@ -36,7 +36,7 @@ public class NewestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
return lineageDateCompare;
}
return Long.compare(o2.getId(), o1.getId());
return Long.compare(o2.getLineageStartIndex(), o1.getLineageStartIndex());
}
}

View File

@ -36,7 +36,7 @@ public class OldestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
return lineageDateCompare;
}
return Long.compare(o1.getId(), o2.getId());
return Long.compare(o1.getLineageStartIndex(), o2.getLineageStartIndex());
}
}