From cf8ca3dc2c9ee9220c0b83b6c003ef20c67fbd33 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 28 Oct 2015 14:45:13 -0400 Subject: [PATCH] NIFI-1082: Ensure that events returned from the provenance repository are ordered such that newest events are provided first --- .../StandardProvenanceEventRecord.java | 36 ++-- .../nifi/provenance/StandardQueryResult.java | 25 ++- .../nifi/provenance/IndexConfiguration.java | 47 +++-- .../PersistentProvenanceRepository.java | 124 ++++++------- .../nifi/provenance/lucene/DocsReader.java | 49 ++---- .../nifi/provenance/lucene/IndexSearch.java | 35 ++-- .../nifi/provenance/lucene/LineageQuery.java | 7 +- .../TestPersistentProvenanceRepository.java | 163 +++++++++++++++++- .../VolatileProvenanceRepository.java | 16 +- 9 files changed, 327 insertions(+), 175 deletions(-) diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java index 4eb7001a7a..892a8f8b81 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java @@ -71,7 +71,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor private final Map previousAttributes; private final Map updatedAttributes; - private volatile long eventId; + private volatile long eventId = -1L; private StandardProvenanceEventRecord(final Builder builder) { this.eventTime = builder.eventTime; @@ -105,8 +105,8 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor contentClaimOffset = builder.contentClaimOffset; contentSize = builder.contentSize; - previousAttributes = builder.previousAttributes == null ? Collections.emptyMap() : Collections.unmodifiableMap(builder.previousAttributes); - updatedAttributes = builder.updatedAttributes == null ? Collections.emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes); + previousAttributes = builder.previousAttributes == null ? Collections. emptyMap() : Collections.unmodifiableMap(builder.previousAttributes); + updatedAttributes = builder.updatedAttributes == null ? Collections. emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes); sourceQueueIdentifier = builder.sourceQueueIdentifier; @@ -198,12 +198,12 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor @Override public List getParentUuids() { - return parentUuids == null ? Collections.emptyList() : parentUuids; + return parentUuids == null ? Collections. emptyList() : parentUuids; } @Override public List getChildUuids() { - return childrenUuids == null ? Collections.emptyList() : childrenUuids; + return childrenUuids == null ? Collections. emptyList() : childrenUuids; } @Override @@ -298,7 +298,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor } return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode()) - + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode; + + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode; } @Override @@ -316,7 +316,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj; // If event ID's are populated and not equal, return false. If they have not yet been populated, do not // use them in the comparison. - if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) { + if (eventId >= 0L && other.getEventId() >= 0L && eventId != other.getEventId()) { return false; } if (eventType != other.eventType) { @@ -397,16 +397,16 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor @Override public String toString() { return "ProvenanceEventRecord [" - + "eventId=" + eventId - + ", eventType=" + eventType - + ", eventTime=" + new Date(eventTime) - + ", uuid=" + uuid - + ", fileSize=" + contentSize - + ", componentId=" + componentId - + ", transitUri=" + transitUri - + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier - + ", parentUuids=" + parentUuids - + ", alternateIdentifierUri=" + alternateIdentifierUri + "]"; + + "eventId=" + eventId + + ", eventType=" + eventType + + ", eventTime=" + new Date(eventTime) + + ", uuid=" + uuid + + ", fileSize=" + contentSize + + ", componentId=" + componentId + + ", transitUri=" + transitUri + + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier + + ", parentUuids=" + parentUuids + + ", alternateIdentifierUri=" + alternateIdentifierUri + "]"; } public static class Builder implements ProvenanceEventBuilder { @@ -663,7 +663,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor setFlowFileEntryDate(flowFile.getEntryDate()); setLineageIdentifiers(flowFile.getLineageIdentifiers()); setLineageStartDate(flowFile.getLineageStartDate()); - setAttributes(Collections.emptyMap(), flowFile.getAttributes()); + setAttributes(Collections. emptyMap(), flowFile.getAttributes()); uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); this.contentSize = flowFile.getSize(); return this; diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java index 9a9a27d79a..bef63e1774 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java @@ -18,6 +18,7 @@ package org.apache.nifi.provenance; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; @@ -40,7 +41,7 @@ public class StandardQueryResult implements QueryResult { private final Lock writeLock = rwLock.writeLock(); // guarded by writeLock - private final List matchingRecords = new ArrayList<>(); + private final List> matchingRecords; private long totalHitCount; private int numCompletedSteps = 0; private Date expirationDate; @@ -53,6 +54,11 @@ public class StandardQueryResult implements QueryResult { this.query = query; this.numSteps = numSteps; this.creationNanos = System.nanoTime(); + this.matchingRecords = new ArrayList<>(numSteps); + + for (int i = 0; i < Math.max(1, numSteps); i++) { + matchingRecords.add(Collections. emptyList()); + } updateExpiration(); } @@ -61,13 +67,14 @@ public class StandardQueryResult implements QueryResult { public List getMatchingEvents() { readLock.lock(); try { - if (matchingRecords.size() <= query.getMaxResults()) { - return new ArrayList<>(matchingRecords); - } - final List copy = new ArrayList<>(query.getMaxResults()); - for (int i = 0; i < query.getMaxResults(); i++) { - copy.add(matchingRecords.get(i)); + for (final List recordList : matchingRecords) { + if (copy.size() + recordList.size() > query.getMaxResults()) { + copy.addAll(recordList.subList(0, query.getMaxResults() - copy.size())); + return copy; + } else { + copy.addAll(recordList); + } } return copy; @@ -141,10 +148,10 @@ public class StandardQueryResult implements QueryResult { } } - public void update(final Collection matchingRecords, final long totalHits) { + public void update(final Collection matchingRecords, final long totalHits, final int indexId) { writeLock.lock(); try { - this.matchingRecords.addAll(matchingRecords); + this.matchingRecords.set(indexId, new ArrayList<>(matchingRecords)); this.totalHitCount += totalHits; numCompletedSteps++; diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java index 4e808111d5..a6e6d5d790 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java @@ -36,7 +36,6 @@ import java.util.regex.Pattern; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,6 +172,9 @@ public class IndexConfiguration { for (final List list : indexDirectoryMap.values()) { files.addAll(list); } + + Collections.sort(files, new IndexDirectoryComparator()); + return files; } finally { lock.unlock(); @@ -198,11 +200,11 @@ public class IndexConfiguration { * span (times inclusive). * * @param startTime the start time of the query for which the indices are - * desired + * desired * @param endTime the end time of the query for which the indices are - * desired + * desired * @return the index directories that are applicable only for the given time - * span (times inclusive). + * span (times inclusive). */ public List getIndexDirectories(final Long startTime, final Long endTime) { if (startTime == null && endTime == null) { @@ -213,14 +215,7 @@ public class IndexConfiguration { lock.lock(); try { final List sortedIndexDirectories = getIndexDirectories(); - Collections.sort(sortedIndexDirectories, new Comparator() { - @Override - public int compare(final File o1, final File o2) { - final long epochTimestamp1 = getIndexStartTime(o1); - final long epochTimestamp2 = getIndexStartTime(o2); - return Long.compare(epochTimestamp1, epochTimestamp2); - } - }); + Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator()); for (final File indexDir : sortedIndexDirectories) { // If the index was last modified before the start time, we know that it doesn't @@ -252,9 +247,9 @@ public class IndexConfiguration { * event log * * @param provenanceLogFile the provenance log file for which the index - * directories are desired + * directories are desired * @return the index directories that are applicable only for the given - * event log + * event log */ public List getIndexDirectories(final File provenanceLogFile) { final List dirs = new ArrayList<>(); @@ -262,23 +257,16 @@ public class IndexConfiguration { try { final List indices = indexDirectoryMap.get(provenanceLogFile.getParentFile()); if (indices == null) { - return Collections.emptyList(); + return Collections. emptyList(); } final List sortedIndexDirectories = new ArrayList<>(indices); - Collections.sort(sortedIndexDirectories, new Comparator() { - @Override - public int compare(final File o1, final File o2) { - final long epochTimestamp1 = getIndexStartTime(o1); - final long epochTimestamp2 = getIndexStartTime(o2); - return Long.compare(epochTimestamp1, epochTimestamp2); - } - }); + Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator()); final Long firstEntryTime = getFirstEntryTime(provenanceLogFile); if (firstEntryTime == null) { logger.debug("Found no records in {} so returning no Indices for it", provenanceLogFile); - return Collections.emptyList(); + return Collections. emptyList(); } boolean foundIndexCreatedLater = false; @@ -376,7 +364,7 @@ public class IndexConfiguration { lock.lock(); try { if (minIndexedId == null || id > minIndexedId) { - if (maxIndexedId == null || id > maxIndexedId) { // id will be > maxIndexedId if all records were expired + if (maxIndexedId == null || id > maxIndexedId) { // id will be > maxIndexedId if all records were expired minIndexedId = maxIndexedId; } else { minIndexedId = id; @@ -395,4 +383,13 @@ public class IndexConfiguration { lock.unlock(); } } + + private class IndexDirectoryComparator implements Comparator { + @Override + public int compare(final File o1, final File o2) { + final long epochTimestamp1 = getIndexStartTime(o1); + final long epochTimestamp2 = getIndexStartTime(o2); + return -Long.compare(epochTimestamp1, epochTimestamp2); + } + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 89e1419869..7f1bf8ce2d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -108,7 +108,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public static final String EVENT_CATEGORY = "Provenance Repository"; private static final String FILE_EXTENSION = ".prov"; private static final String TEMP_FILE_SUFFIX = ".prov.part"; - private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur + private static final long PURGE_EVENT_MILLISECONDS = 2500L; // Determines the frequency over which the task to delete old events will occur public static final int SERIALIZATION_VERSION = 8; public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+"); @@ -404,9 +404,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // if this is the first record, try to find out the block index and jump directly to // the block index. This avoids having to read through a lot of data that we don't care about // just to get to the first record that we want. - if ( records.isEmpty() ) { + if (records.isEmpty()) { final TocReader tocReader = reader.getTocReader(); - if ( tocReader != null ) { + if (tocReader != null) { final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId); if (blockIndex != null) { reader.skipToBlock(blockIndex); @@ -641,7 +641,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexManager.close(); - if ( writers != null ) { + if (writers != null) { for (final RecordWriter writer : writers) { writer.close(); } @@ -700,7 +700,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // journal will result in corruption! writer.markDirty(); dirtyWriterCount.incrementAndGet(); - streamStartTime.set(0L); // force rollover to happen soon. + streamStartTime.set(0L); // force rollover to happen soon. throw t; } finally { writer.unlock(); @@ -912,15 +912,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository removed.add(baseName); } catch (final FileNotFoundException fnf) { logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not " - + "perform additional Expiration Actions on this file", currentAction, file); + + "perform additional Expiration Actions on this file", currentAction, file); removed.add(baseName); } catch (final Throwable t) { logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional " - + "Expiration Actions on this file at this time", currentAction, file, t.toString()); + + "Expiration Actions on this file at this time", currentAction, file, t.toString()); logger.warn("", t); eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + - " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + - "on this file at this time"); + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + + "on this file at this time"); } } @@ -1131,10 +1131,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository protected int getJournalCount() { // determine how many 'journals' we have in the journals directories int journalFileCount = 0; - for ( final File storageDir : configuration.getStorageDirectories() ) { + for (final File storageDir : configuration.getStorageDirectories()) { final File journalsDir = new File(storageDir, "journals"); final File[] journalFiles = journalsDir.listFiles(); - if ( journalFiles != null ) { + if (journalFiles != null) { journalFileCount += journalFiles.length; } } @@ -1169,12 +1169,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository writer.close(); } catch (final IOException ioe) { logger.warn("Failed to close {} due to {}", writer, ioe.toString()); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", ioe); } } } - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } @@ -1263,10 +1263,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // that is no longer the case. if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " - + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and " - + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate. Slowing down flow to accommodate"); + + "exceeding the provenance recording rate. Slowing down flow to accommodate"); while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { // if a shutdown happens while we are in this loop, kill the rollover thread and break @@ -1293,15 +1293,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accommodate. Currently, there are {} journal files ({} bytes) and " - + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + + "to accommodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); journalFileCount = getJournalCount(); repoSize = getSize(getLogFiles(), 0L); } logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of " - + "journal files to be rolled over is {}", journalFileCount); + + "journal files to be rolled over is {}", journalFileCount); } // we've finished rolling over successfully. Create new writers and reset state. @@ -1335,7 +1335,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } for (final File journalFile : journalFiles) { - if ( journalFile.isDirectory() ) { + if (journalFile.isDirectory()) { continue; } @@ -1403,7 +1403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository */ File mergeJournals(final List journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile); - if ( this.closed.get() ) { + if (this.closed.get()) { logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); return null; } @@ -1439,14 +1439,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // we have all "partial" files and there is already a merged file. Delete the data from the index // because the merge file may not be fully merged. We will re-merge. logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " - + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); + + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); try { deleteAction.execute(suggestedMergeFile); } catch (final Exception e) { logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString()); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", e); } } @@ -1460,18 +1460,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } final File tocFile = TocUtil.getTocFile(suggestedMergeFile); - if ( tocFile.exists() && !tocFile.delete() ) { + if (tocFile.exists() && !tocFile.delete()) { logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile); } } } else { logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " - + "but it did not; assuming that the files were already merged but only some finished deletion " - + "before restart. Deleting remaining partial journal files.", journalFiles); + + "but it did not; assuming that the files were already merged but only some finished deletion " + + "before restart. Deleting remaining partial journal files.", journalFiles); - for ( final File file : journalFiles ) { - if ( !file.delete() && file.exists() ) { + for (final File file : journalFiles) { + if (!file.delete() && file.exists()) { logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); } } @@ -1529,7 +1529,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } catch (final EOFException eof) { } catch (final Exception e) { logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't " - + "completely written to the file. This record will be skipped."); + + "completely written to the file. This record will be skipped."); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -1544,11 +1544,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository continue; } - if ( record.getEventTime() < earliestTimestamp ) { + if (record.getEventTime() < earliestTimestamp) { earliestTimestamp = record.getEventTime(); } - if ( record.getEventId() < minEventId ) { + if (record.getEventId() < minEventId) { minEventId = record.getEventId(); } @@ -1799,7 +1799,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final int numQueries = querySubmissionMap.size(); if (numQueries > MAX_UNDELETED_QUERY_RESULTS) { throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not " - + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); + + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); } if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { @@ -1820,7 +1820,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository Long maxEventId = getMaxEventId(); if (maxEventId == null) { - result.getResult().update(Collections.emptyList(), 0L); + result.getResult().update(Collections. emptyList(), 0L, 0); maxEventId = 0L; } Long minIndexedId = indexConfig.getMinIdIndexed(); @@ -1830,7 +1830,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final long totalNumDocs = maxEventId - minIndexedId; - result.getResult().update(trimmed, totalNumDocs); + result.getResult().update(trimmed, totalNumDocs, 0); } else { queryExecService.submit(new GetMostRecentRunnable(query, result)); } @@ -1839,18 +1839,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return result; } - final AtomicInteger retrievalCount = new AtomicInteger(0); final List indexDirectories = indexConfig.getIndexDirectories( - query.getStartDate() == null ? null : query.getStartDate().getTime(), - query.getEndDate() == null ? null : query.getEndDate().getTime()); + query.getStartDate() == null ? null : query.getStartDate().getTime(), + query.getEndDate() == null ? null : query.getEndDate().getTime()); final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size()); querySubmissionMap.put(query.getIdentifier(), result); if (indexDirectories.isEmpty()) { - result.getResult().update(Collections.emptyList(), 0L); + result.getResult().update(Collections. emptyList(), 0L, 0); } else { + int indexId = 0; for (final File indexDir : indexDirectories) { - queryExecService.submit(new QueryRunnable(query, result, indexDir, retrievalCount)); + queryExecService.submit(new QueryRunnable(query, result, indexDir, indexId++)); } } @@ -2024,11 +2024,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } Lineage computeLineage(final String flowFileUuid) throws IOException { - return computeLineage(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); + return computeLineage(Collections. singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); } private Lineage computeLineage(final Collection flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, - final Long endTimestamp) throws IOException { + final Long endTimestamp) throws IOException { final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp); final StandardLineageResult result = submission.getResult(); while (!result.isFinished()) { @@ -2051,7 +2051,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } private AsyncLineageSubmission submitLineageComputation(final Collection flowFileUuids, final LineageComputationType computationType, - final Long eventId, final long startTimestamp, final long endTimestamp) { + final Long eventId, final long startTimestamp, final long endTimestamp) { final List indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp); final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size()); lineageSubmissionMap.put(result.getLineageIdentifier(), result); @@ -2068,9 +2068,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.emptyList()); + submission.getResult().update(Collections. emptyList()); return submission; } @@ -2081,13 +2081,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository case REPLAY: return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); default: - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); return submission; } } catch (final IOException ioe) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); if (ioe.getMessage() == null) { @@ -2105,9 +2105,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.emptyList()); + submission.getResult().update(Collections. emptyList()); return submission; } @@ -2118,14 +2118,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository case REPLAY: return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime()); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); return submission; } } } catch (final IOException ioe) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); if (ioe.getMessage() == null) { @@ -2248,7 +2248,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // get the max indexed event id final Long maxEventId = indexConfig.getMaxIdIndexed(); if (maxEventId == null) { - submission.getResult().update(Collections.emptyList(), 0); + submission.getResult().update(Collections. emptyList(), 0L, 0); return; } @@ -2263,7 +2263,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final long totalNumDocs = maxEventId - minIndexedId; final List mostRecent = getEvents(startIndex, maxResults); - submission.getResult().update(mostRecent, totalNumDocs); + // reverse the order so that the newest events come first. + Collections.reverse(mostRecent); + submission.getResult().update(mostRecent, totalNumDocs, 0); } catch (final IOException ioe) { logger.error("Failed to retrieve records from Provenance Repository: " + ioe.toString()); if (logger.isDebugEnabled()) { @@ -2284,24 +2286,28 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final Query query; private final AsyncQuerySubmission submission; private final File indexDir; - private final AtomicInteger retrievalCount; + private final int indexId; - public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final AtomicInteger retrievalCount) { + public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final int indexId) { this.query = query; this.submission = submission; this.indexDir = indexDir; - this.retrievalCount = retrievalCount; + this.indexId = indexId; } @Override public void run() { try { final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars); - final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp); - submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); + final StandardQueryResult queryResult = search.search(query, firstEventTimestamp); + + logger.debug("Merging query results for indexId {}; before merge, num events = {}", indexId, queryResult.getTotalHitCount()); + submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount(), indexId); + logger.debug("Merging query results for indexId {}; after merge, num events = {}", indexId, queryResult.getTotalHitCount()); + if (queryResult.isFinished()) { logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}", - query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount()); + query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount()); } } catch (final Throwable t) { logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString()); @@ -2344,7 +2350,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository result.update(matchingRecords); logger.info("Successfully created Lineage for FlowFiles with UUIDs {} in {} milliseconds; Lineage contains {} nodes and {} edges", - flowFileUuids, result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), result.getEdges().size()); + flowFileUuids, result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), result.getEdges().size()); } catch (final Throwable t) { logger.error("Failed to query provenance repository due to {}", t.toString()); if (logger.isDebugEnabled()) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index c2a7609eb8..e6e78c17c9 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -22,25 +22,23 @@ import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.toc.TocReader; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +49,7 @@ public class DocsReader { } public Set read(final TopDocs topDocs, final IndexReader indexReader, final Collection allProvenanceLogFiles, - final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { - if (retrievalCount.get() >= maxResults) { - return Collections.emptySet(); - } - + final int maxResults, final int maxAttributeChars) throws IOException { final long start = System.nanoTime(); final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); final List docs = new ArrayList<>(numDocs); @@ -68,13 +62,13 @@ public class DocsReader { final long readDocuments = System.nanoTime() - start; logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); - return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars); + return read(docs, allProvenanceLogFiles, maxResults, maxAttributeChars); } private long getByteOffset(final Document d, final RecordReader reader) { final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); - if ( blockField != null ) { + if (blockField != null) { final int blockIndex = blockField.numericValue().intValue(); final TocReader tocReader = reader.getTocReader(); return tocReader.getBlockOffset(blockIndex); @@ -86,21 +80,21 @@ public class DocsReader { private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException { final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); - if ( blockField == null ) { + if (blockField == null) { reader.skipTo(getByteOffset(d, reader)); } else { reader.skipToBlock(blockField.numericValue().intValue()); } StandardProvenanceEventRecord record; - while ( (record = reader.nextRecord()) != null) { + while ((record = reader.nextRecord()) != null) { final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); - if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) { + if (idField == null || idField.numericValue().longValue() == record.getEventId()) { break; } } - if ( record == null ) { + if (record == null) { throw new IOException("Failed to find Provenance Event " + d); } else { return record; @@ -109,10 +103,7 @@ public class DocsReader { public Set read(final List docs, final Collection allProvenanceLogFiles, - final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { - if (retrievalCount.get() >= maxResults) { - return Collections.emptySet(); - } + final int maxResults, final int maxAttributeChars) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); @@ -129,7 +120,7 @@ public class DocsReader { try { for (final Document d : docs) { final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue(); - if ( storageFilesToSkip.contains(storageFilename) ) { + if (storageFilesToSkip.contains(storageFilename)) { continue; } @@ -137,10 +128,6 @@ public class DocsReader { if (reader != null && storageFilename.equals(lastStorageFilename)) { matchingRecords.add(getRecord(d, reader)); eventsReadThisFile++; - - if ( retrievalCount.incrementAndGet() >= maxResults ) { - break; - } } else { logger.debug("Opening log file {}", storageFilename); @@ -152,14 +139,14 @@ public class DocsReader { final List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { logger.warn("Could not find Provenance Log File with basename {} in the " - + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); + + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); storageFilesToSkip.add(storageFilename); continue; } if (potentialFiles.size() > 1) { throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + - storageFilename + " in the Provenance Repository"); + storageFilename + " in the Provenance Repository"); } for (final File file : potentialFiles) { @@ -171,10 +158,6 @@ public class DocsReader { reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars); matchingRecords.add(getRecord(d, reader)); eventsReadThisFile = 1; - - if ( retrievalCount.incrementAndGet() >= maxResults ) { - break; - } } catch (final IOException e) { throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index 7fcd8ab718..c0ca8a7f2b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -19,17 +19,22 @@ package org.apache.nifi.provenance.lucene; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortField.Type; import org.apache.lucene.search.TopDocs; import org.apache.nifi.provenance.PersistentProvenanceRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardQueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +53,7 @@ public class IndexSearch { this.maxAttributeChars = maxAttributeChars; } - public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount, final long firstEventTimestamp) throws IOException { + public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final long firstEventTimestamp) throws IOException { if (!indexDirectory.exists() && !indexDirectory.mkdirs()) { throw new IOException("Unable to create Indexing Directory " + indexDirectory); } @@ -57,7 +62,6 @@ public class IndexSearch { } final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1); - final Set matchingRecords; // we need to set the start date because if we do not, the first index may still have events that have aged off from // the repository, and we don't want those events to count toward the total number of matches. @@ -77,38 +81,47 @@ public class IndexSearch { final long searchStartNanos = System.nanoTime(); final long openSearcherNanos = searchStartNanos - start; - final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final Sort sort = new Sort(new SortField(SearchableFields.Identifier.getSearchableFieldName(), Type.LONG, true)); + final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults(), sort); final long finishSearch = System.nanoTime(); final long searchNanos = finishSearch - searchStartNanos; logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, - TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); + TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); if (topDocs.totalHits == 0) { - sqr.update(Collections.emptyList(), 0); + sqr.update(Collections. emptyList(), 0, 0); return sqr; } final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); - matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, + final Set matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), provenanceQuery.getMaxResults(), maxAttributeChars); final long readRecordsNanos = System.nanoTime() - finishSearch; logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); - sqr.update(matchingRecords, topDocs.totalHits); + // The records returned are going to be in a sorted set. The sort order will be dependent on + // the ID of the events, which is also approximately the same as the timestamp of the event (i.e. + // it's ordered by the time when the event was inserted into the repo, not the time when the event took + // place). We want to reverse this so that we get the newest events first, so we have to first create a + // new List object to hold the events, and then reverse the list. + final List recordList = new ArrayList<>(matchingRecords); + Collections.reverse(recordList); + + sqr.update(recordList, topDocs.totalHits, 0); return sqr; } catch (final FileNotFoundException e) { // nothing has been indexed yet, or the data has already aged off logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", e); } - sqr.update(Collections.emptyList(), 0); + sqr.update(Collections. emptyList(), 0, 0); return sqr; } finally { - if ( searcher != null ) { + if (searcher != null) { indexManager.returnIndexSearcher(indexDirectory, searcher); } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index e9e6e63462..e1996f6089 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause.Occur; @@ -95,11 +94,11 @@ public class LineageQuery { final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), - new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars); + Integer.MAX_VALUE, maxAttributeChars); final long readDocsEnd = System.nanoTime(); logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis", - indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); + indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); return recs; } finally { @@ -108,7 +107,7 @@ public class LineageQuery { } catch (final FileNotFoundException fnfe) { // nothing has been indexed yet, or the data has already aged off logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", fnfe); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 687574351e..036e97fa92 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileFilter; +import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -117,16 +118,16 @@ public class TestPersistentProvenanceRepository { // Delete all of the storage files. We do this in order to clean up the tons of files that // we create but also to ensure that we have closed all of the file handles. If we leave any // streams open, for instance, this will throw an IOException, causing our unit test to fail. - for ( final File storageDir : config.getStorageDirectories() ) { + for (final File storageDir : config.getStorageDirectories()) { int i; - for (i=0; i < 3; i++) { + for (i = 0; i < 3; i++) { try { FileUtils.deleteFile(storageDir, true); break; } catch (final IOException ioe) { // if there is a virus scanner, etc. running in the background we may not be able to // delete the file. Wait a sec and try again. - if ( i == 2 ) { + if (i == 2) { throw ioe; } else { try { @@ -441,7 +442,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); final Query query = new Query(UUID.randomUUID().toString()); - // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*")); + // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); @@ -603,7 +604,7 @@ public class TestPersistentProvenanceRepository { repo.purgeOldEvents(); - Thread.sleep(2000L); // purge is async. Give it time to do its job. + Thread.sleep(2000L); // purge is async. Give it time to do its job. query.setMaxResults(100); final QuerySubmission noResultSubmission = repo.submitQuery(query); @@ -614,6 +615,152 @@ public class TestPersistentProvenanceRepository { assertEquals(0, noResultSubmission.getResult().getTotalHitCount()); } + + @Test + public void testEventsAreOrdered() throws IOException, InterruptedException, ParseException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(30, TimeUnit.SECONDS); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(System.currentTimeMillis()); + repo.registerEvent(builder.build()); + Thread.sleep(20); + } + + // Give time for rollover to happen + repo.waitForRollover(); + + // Perform a "Most Recent Events" Query + final Query query = new Query(UUID.randomUUID().toString()); + query.setMaxResults(100); + + final QueryResult result = repo.queryEvents(query); + assertEquals(10, result.getMatchingEvents().size()); + + final List matchingEvents = result.getMatchingEvents(); + long timestamp = matchingEvents.get(0).getEventTime(); + + for (final ProvenanceEventRecord record : matchingEvents) { + assertTrue(record.getEventTime() <= timestamp); + timestamp = record.getEventTime(); + } + + // Perform a Query for a particular component, so that this doesn't just get the most recent events + // and has to actually hit Lucene. + final Query query2 = new Query(UUID.randomUUID().toString()); + query2.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234")); + query2.setMaxResults(100); + final QueryResult result2 = repo.queryEvents(query2); + assertEquals(10, result2.getMatchingEvents().size()); + + final List matchingEvents2 = result2.getMatchingEvents(); + timestamp = matchingEvents2.get(0).getEventTime(); + + for (final ProvenanceEventRecord record : matchingEvents2) { + assertTrue(record.getEventTime() <= timestamp); + timestamp = record.getEventTime(); + } + } + + + @Test + public void testEventsAreOrderedAcrossMultipleIndexes() throws IOException, InterruptedException, ParseException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(30, TimeUnit.SECONDS); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(1L); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(System.currentTimeMillis()); + repo.registerEvent(builder.build()); + Thread.sleep(20); + } + + // Give time for rollover to happen + repo.waitForRollover(); + + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(System.currentTimeMillis()); + repo.registerEvent(builder.build()); + Thread.sleep(20); + } + + repo.waitForRollover(); + + // Verify that multiple indexes exist + final File storageDir = config.getStorageDirectories().get(0); + final File[] subDirs = storageDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return name.startsWith("index-"); + } + }); + assertEquals(2, subDirs.length); + + // Perform a Query for a particular component, so that this doesn't just get the most recent events + // and has to actually hit Lucene. + final Query query = new Query(UUID.randomUUID().toString()); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234")); + query.setMaxResults(100); + final QueryResult result = repo.queryEvents(query); + assertEquals(20, result.getMatchingEvents().size()); + + final List matchingEvents = result.getMatchingEvents(); + long timestamp = matchingEvents.get(0).getEventTime(); + + for (final ProvenanceEventRecord record : matchingEvents) { + assertTrue(record.getEventTime() <= timestamp); + timestamp = record.getEventTime(); + } + } + + @Test public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); @@ -939,7 +1086,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setMaxEventFileCapacity(1024L * 1024L); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); - config.setDesiredIndexSize(10); // force new index to be created for each rollover + config.setDesiredIndexSize(10); // force new index to be created for each rollover repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); repo.initialize(getEventReporter()); @@ -961,7 +1108,7 @@ public class TestPersistentProvenanceRepository { for (int i = 0; i < 10; i++) { attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); - builder.setEventTime(10L); // make sure the events are destroyed when we call purge + builder.setEventTime(10L); // make sure the events are destroyed when we call purge repo.registerEvent(builder.build()); } @@ -1019,7 +1166,7 @@ public class TestPersistentProvenanceRepository { @Test public void testBackPressure() throws IOException, InterruptedException { final RepositoryConfiguration config = createConfiguration(); - config.setMaxEventFileCapacity(1L); // force rollover on each record. + config.setMaxEventFileCapacity(1L); // force rollover on each record. config.setJournalCount(1); final AtomicInteger journalCountRef = new AtomicInteger(0); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index f4f9d127c0..ac13f0819c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -373,7 +373,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } public Lineage computeLineage(final String flowFileUUID) throws IOException { - return computeLineage(Collections.singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null); + return computeLineage(Collections. singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null); } private Lineage computeLineage(final Collection flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException { @@ -411,9 +411,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { public ComputeLineageSubmission submitExpandParents(final long eventId) { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.emptyList()); + submission.getResult().update(Collections. emptyList()); return submission; } @@ -424,7 +424,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { case CLONE: return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); return submission; @@ -440,9 +440,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { public ComputeLineageSubmission submitExpandChildren(final long eventId) { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.emptyList()); + submission.getResult().update(Collections. emptyList()); return submission; } @@ -453,7 +453,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { case CLONE: return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections. emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); return submission; @@ -526,7 +526,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { }, IterationDirection.BACKWARD); - submission.getResult().update(matchingRecords, matchingCount.get()); + submission.getResult().update(matchingRecords, matchingCount.get(), 0); } }