mirror of https://github.com/apache/nifi.git
Revert "NIFI-1082: Ensure that events returned from the provenance repository are ordered such that newest events are provided first"
This reverts commit cf8ca3dc2c
.
This commit is contained in:
parent
0d6e81b54f
commit
59a49aea12
|
@ -71,7 +71,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
|||
private final Map<String, String> previousAttributes;
|
||||
private final Map<String, String> updatedAttributes;
|
||||
|
||||
private volatile long eventId = -1L;
|
||||
private volatile long eventId;
|
||||
|
||||
private StandardProvenanceEventRecord(final Builder builder) {
|
||||
this.eventTime = builder.eventTime;
|
||||
|
@ -316,7 +316,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
|||
final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
|
||||
// If event ID's are populated and not equal, return false. If they have not yet been populated, do not
|
||||
// use them in the comparison.
|
||||
if (eventId >= 0L && other.getEventId() >= 0L && eventId != other.getEventId()) {
|
||||
if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
|
||||
return false;
|
||||
}
|
||||
if (eventType != other.eventType) {
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.nifi.provenance;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -41,7 +40,7 @@ public class StandardQueryResult implements QueryResult {
|
|||
|
||||
private final Lock writeLock = rwLock.writeLock();
|
||||
// guarded by writeLock
|
||||
private final List<List<ProvenanceEventRecord>> matchingRecords;
|
||||
private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
|
||||
private long totalHitCount;
|
||||
private int numCompletedSteps = 0;
|
||||
private Date expirationDate;
|
||||
|
@ -54,11 +53,6 @@ public class StandardQueryResult implements QueryResult {
|
|||
this.query = query;
|
||||
this.numSteps = numSteps;
|
||||
this.creationNanos = System.nanoTime();
|
||||
this.matchingRecords = new ArrayList<>(numSteps);
|
||||
|
||||
for (int i = 0; i < Math.max(1, numSteps); i++) {
|
||||
matchingRecords.add(Collections.<ProvenanceEventRecord> emptyList());
|
||||
}
|
||||
|
||||
updateExpiration();
|
||||
}
|
||||
|
@ -67,14 +61,13 @@ public class StandardQueryResult implements QueryResult {
|
|||
public List<ProvenanceEventRecord> getMatchingEvents() {
|
||||
readLock.lock();
|
||||
try {
|
||||
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
||||
for (final List<ProvenanceEventRecord> recordList : matchingRecords) {
|
||||
if (copy.size() + recordList.size() > query.getMaxResults()) {
|
||||
copy.addAll(recordList.subList(0, query.getMaxResults() - copy.size()));
|
||||
return copy;
|
||||
} else {
|
||||
copy.addAll(recordList);
|
||||
if (matchingRecords.size() <= query.getMaxResults()) {
|
||||
return new ArrayList<>(matchingRecords);
|
||||
}
|
||||
|
||||
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
||||
for (int i = 0; i < query.getMaxResults(); i++) {
|
||||
copy.add(matchingRecords.get(i));
|
||||
}
|
||||
|
||||
return copy;
|
||||
|
@ -148,10 +141,10 @@ public class StandardQueryResult implements QueryResult {
|
|||
}
|
||||
}
|
||||
|
||||
public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits, final int indexId) {
|
||||
public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
this.matchingRecords.set(indexId, new ArrayList<>(matchingRecords));
|
||||
this.matchingRecords.addAll(matchingRecords);
|
||||
this.totalHitCount += totalHits;
|
||||
|
||||
numCompletedSteps++;
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.nifi.provenance.serialization.RecordReader;
|
||||
import org.apache.nifi.provenance.serialization.RecordReaders;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -172,9 +173,6 @@ public class IndexConfiguration {
|
|||
for (final List<File> list : indexDirectoryMap.values()) {
|
||||
files.addAll(list);
|
||||
}
|
||||
|
||||
Collections.sort(files, new IndexDirectoryComparator());
|
||||
|
||||
return files;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
@ -215,7 +213,14 @@ public class IndexConfiguration {
|
|||
lock.lock();
|
||||
try {
|
||||
final List<File> sortedIndexDirectories = getIndexDirectories();
|
||||
Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator());
|
||||
Collections.sort(sortedIndexDirectories, new Comparator<File>() {
|
||||
@Override
|
||||
public int compare(final File o1, final File o2) {
|
||||
final long epochTimestamp1 = getIndexStartTime(o1);
|
||||
final long epochTimestamp2 = getIndexStartTime(o2);
|
||||
return Long.compare(epochTimestamp1, epochTimestamp2);
|
||||
}
|
||||
});
|
||||
|
||||
for (final File indexDir : sortedIndexDirectories) {
|
||||
// If the index was last modified before the start time, we know that it doesn't
|
||||
|
@ -261,7 +266,14 @@ public class IndexConfiguration {
|
|||
}
|
||||
|
||||
final List<File> sortedIndexDirectories = new ArrayList<>(indices);
|
||||
Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator());
|
||||
Collections.sort(sortedIndexDirectories, new Comparator<File>() {
|
||||
@Override
|
||||
public int compare(final File o1, final File o2) {
|
||||
final long epochTimestamp1 = getIndexStartTime(o1);
|
||||
final long epochTimestamp2 = getIndexStartTime(o2);
|
||||
return Long.compare(epochTimestamp1, epochTimestamp2);
|
||||
}
|
||||
});
|
||||
|
||||
final Long firstEntryTime = getFirstEntryTime(provenanceLogFile);
|
||||
if (firstEntryTime == null) {
|
||||
|
@ -383,13 +395,4 @@ public class IndexConfiguration {
|
|||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private class IndexDirectoryComparator implements Comparator<File> {
|
||||
@Override
|
||||
public int compare(final File o1, final File o2) {
|
||||
final long epochTimestamp1 = getIndexStartTime(o1);
|
||||
final long epochTimestamp2 = getIndexStartTime(o2);
|
||||
return -Long.compare(epochTimestamp1, epochTimestamp2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1820,7 +1820,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
Long maxEventId = getMaxEventId();
|
||||
if (maxEventId == null) {
|
||||
result.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
|
||||
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
|
||||
maxEventId = 0L;
|
||||
}
|
||||
Long minIndexedId = indexConfig.getMinIdIndexed();
|
||||
|
@ -1830,7 +1830,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
final long totalNumDocs = maxEventId - minIndexedId;
|
||||
|
||||
result.getResult().update(trimmed, totalNumDocs, 0);
|
||||
result.getResult().update(trimmed, totalNumDocs);
|
||||
} else {
|
||||
queryExecService.submit(new GetMostRecentRunnable(query, result));
|
||||
}
|
||||
|
@ -1839,6 +1839,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
return result;
|
||||
}
|
||||
|
||||
final AtomicInteger retrievalCount = new AtomicInteger(0);
|
||||
final List<File> indexDirectories = indexConfig.getIndexDirectories(
|
||||
query.getStartDate() == null ? null : query.getStartDate().getTime(),
|
||||
query.getEndDate() == null ? null : query.getEndDate().getTime());
|
||||
|
@ -1846,11 +1847,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
querySubmissionMap.put(query.getIdentifier(), result);
|
||||
|
||||
if (indexDirectories.isEmpty()) {
|
||||
result.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
|
||||
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
|
||||
} else {
|
||||
int indexId = 0;
|
||||
for (final File indexDir : indexDirectories) {
|
||||
queryExecService.submit(new QueryRunnable(query, result, indexDir, indexId++));
|
||||
queryExecService.submit(new QueryRunnable(query, result, indexDir, retrievalCount));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2248,7 +2248,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
// get the max indexed event id
|
||||
final Long maxEventId = indexConfig.getMaxIdIndexed();
|
||||
if (maxEventId == null) {
|
||||
submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
|
||||
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2263,9 +2263,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
final long totalNumDocs = maxEventId - minIndexedId;
|
||||
|
||||
final List<ProvenanceEventRecord> mostRecent = getEvents(startIndex, maxResults);
|
||||
// reverse the order so that the newest events come first.
|
||||
Collections.reverse(mostRecent);
|
||||
submission.getResult().update(mostRecent, totalNumDocs, 0);
|
||||
submission.getResult().update(mostRecent, totalNumDocs);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to retrieve records from Provenance Repository: " + ioe.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -2286,25 +2284,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
private final Query query;
|
||||
private final AsyncQuerySubmission submission;
|
||||
private final File indexDir;
|
||||
private final int indexId;
|
||||
private final AtomicInteger retrievalCount;
|
||||
|
||||
public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final int indexId) {
|
||||
public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final AtomicInteger retrievalCount) {
|
||||
this.query = query;
|
||||
this.submission = submission;
|
||||
this.indexDir = indexDir;
|
||||
this.indexId = indexId;
|
||||
this.retrievalCount = retrievalCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
|
||||
final StandardQueryResult queryResult = search.search(query, firstEventTimestamp);
|
||||
|
||||
logger.debug("Merging query results for indexId {}; before merge, num events = {}", indexId, queryResult.getTotalHitCount());
|
||||
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount(), indexId);
|
||||
logger.debug("Merging query results for indexId {}; after merge, num events = {}", indexId, queryResult.getTotalHitCount());
|
||||
|
||||
final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp);
|
||||
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
|
||||
if (queryResult.isFinished()) {
|
||||
logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}",
|
||||
query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
|
||||
|
|
|
@ -22,23 +22,25 @@ import java.io.IOException;
|
|||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.SearchableFields;
|
||||
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.serialization.RecordReader;
|
||||
import org.apache.nifi.provenance.serialization.RecordReaders;
|
||||
import org.apache.nifi.provenance.toc.TocReader;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -49,7 +51,11 @@ public class DocsReader {
|
|||
}
|
||||
|
||||
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
|
||||
final int maxResults, final int maxAttributeChars) throws IOException {
|
||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
||||
if (retrievalCount.get() >= maxResults) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
final long start = System.nanoTime();
|
||||
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
|
||||
final List<Document> docs = new ArrayList<>(numDocs);
|
||||
|
@ -62,7 +68,7 @@ public class DocsReader {
|
|||
|
||||
final long readDocuments = System.nanoTime() - start;
|
||||
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
|
||||
return read(docs, allProvenanceLogFiles, maxResults, maxAttributeChars);
|
||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars);
|
||||
}
|
||||
|
||||
|
||||
|
@ -103,7 +109,10 @@ public class DocsReader {
|
|||
|
||||
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
|
||||
final int maxResults, final int maxAttributeChars) throws IOException {
|
||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
||||
if (retrievalCount.get() >= maxResults) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
LuceneUtil.sortDocsForRetrieval(docs);
|
||||
|
||||
|
@ -128,6 +137,10 @@ public class DocsReader {
|
|||
if (reader != null && storageFilename.equals(lastStorageFilename)) {
|
||||
matchingRecords.add(getRecord(d, reader));
|
||||
eventsReadThisFile++;
|
||||
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
logger.debug("Opening log file {}", storageFilename);
|
||||
|
||||
|
@ -158,6 +171,10 @@ public class DocsReader {
|
|||
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
|
||||
matchingRecords.add(getRecord(d, reader));
|
||||
eventsReadThisFile = 1;
|
||||
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
break;
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
|
||||
}
|
||||
|
|
|
@ -19,22 +19,17 @@ package org.apache.nifi.provenance.lucene;
|
|||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.apache.lucene.search.SortField.Type;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.nifi.provenance.PersistentProvenanceRepository;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.SearchableFields;
|
||||
import org.apache.nifi.provenance.StandardQueryResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -53,7 +48,7 @@ public class IndexSearch {
|
|||
this.maxAttributeChars = maxAttributeChars;
|
||||
}
|
||||
|
||||
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final long firstEventTimestamp) throws IOException {
|
||||
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount, final long firstEventTimestamp) throws IOException {
|
||||
if (!indexDirectory.exists() && !indexDirectory.mkdirs()) {
|
||||
throw new IOException("Unable to create Indexing Directory " + indexDirectory);
|
||||
}
|
||||
|
@ -62,6 +57,7 @@ public class IndexSearch {
|
|||
}
|
||||
|
||||
final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
|
||||
final Set<ProvenanceEventRecord> matchingRecords;
|
||||
|
||||
// we need to set the start date because if we do not, the first index may still have events that have aged off from
|
||||
// the repository, and we don't want those events to count toward the total number of matches.
|
||||
|
@ -81,8 +77,7 @@ public class IndexSearch {
|
|||
final long searchStartNanos = System.nanoTime();
|
||||
final long openSearcherNanos = searchStartNanos - start;
|
||||
|
||||
final Sort sort = new Sort(new SortField(SearchableFields.Identifier.getSearchableFieldName(), Type.LONG, true));
|
||||
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults(), sort);
|
||||
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
|
||||
final long finishSearch = System.nanoTime();
|
||||
final long searchNanos = finishSearch - searchStartNanos;
|
||||
|
||||
|
@ -90,26 +85,18 @@ public class IndexSearch {
|
|||
TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
|
||||
|
||||
if (topDocs.totalHits == 0) {
|
||||
sqr.update(Collections.<ProvenanceEventRecord> emptyList(), 0, 0);
|
||||
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
|
||||
return sqr;
|
||||
}
|
||||
|
||||
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
|
||||
final Set<ProvenanceEventRecord> matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(),
|
||||
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
|
||||
provenanceQuery.getMaxResults(), maxAttributeChars);
|
||||
|
||||
final long readRecordsNanos = System.nanoTime() - finishSearch;
|
||||
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
|
||||
|
||||
// The records returned are going to be in a sorted set. The sort order will be dependent on
|
||||
// the ID of the events, which is also approximately the same as the timestamp of the event (i.e.
|
||||
// it's ordered by the time when the event was inserted into the repo, not the time when the event took
|
||||
// place). We want to reverse this so that we get the newest events first, so we have to first create a
|
||||
// new List object to hold the events, and then reverse the list.
|
||||
final List<ProvenanceEventRecord> recordList = new ArrayList<>(matchingRecords);
|
||||
Collections.reverse(recordList);
|
||||
|
||||
sqr.update(recordList, topDocs.totalHits, 0);
|
||||
sqr.update(matchingRecords, topDocs.totalHits);
|
||||
return sqr;
|
||||
} catch (final FileNotFoundException e) {
|
||||
// nothing has been indexed yet, or the data has already aged off
|
||||
|
@ -118,7 +105,7 @@ public class IndexSearch {
|
|||
logger.warn("", e);
|
||||
}
|
||||
|
||||
sqr.update(Collections.<ProvenanceEventRecord> emptyList(), 0, 0);
|
||||
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
|
||||
return sqr;
|
||||
} finally {
|
||||
if ( searcher != null ) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
|
@ -94,7 +95,7 @@ public class LineageQuery {
|
|||
|
||||
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
|
||||
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
|
||||
Integer.MAX_VALUE, maxAttributeChars);
|
||||
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
|
||||
|
||||
final long readDocsEnd = System.nanoTime();
|
||||
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
|
||||
|
|
|
@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -615,152 +614,6 @@ public class TestPersistentProvenanceRepository {
|
|||
assertEquals(0, noResultSubmission.getResult().getTotalHitCount());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEventsAreOrdered() throws IOException, InterruptedException, ParseException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
repo.initialize(getEventReporter());
|
||||
|
||||
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("abc", "xyz");
|
||||
attributes.put("xyz", "abc");
|
||||
attributes.put("filename", "file-" + uuid);
|
||||
|
||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||
builder.setTransitUri("nifi://unit-test");
|
||||
attributes.put("uuid", uuid);
|
||||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||
builder.setComponentId("1234");
|
||||
builder.setComponentType("dummy processor");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
repo.registerEvent(builder.build());
|
||||
Thread.sleep(20);
|
||||
}
|
||||
|
||||
// Give time for rollover to happen
|
||||
repo.waitForRollover();
|
||||
|
||||
// Perform a "Most Recent Events" Query
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
query.setMaxResults(100);
|
||||
|
||||
final QueryResult result = repo.queryEvents(query);
|
||||
assertEquals(10, result.getMatchingEvents().size());
|
||||
|
||||
final List<ProvenanceEventRecord> matchingEvents = result.getMatchingEvents();
|
||||
long timestamp = matchingEvents.get(0).getEventTime();
|
||||
|
||||
for (final ProvenanceEventRecord record : matchingEvents) {
|
||||
assertTrue(record.getEventTime() <= timestamp);
|
||||
timestamp = record.getEventTime();
|
||||
}
|
||||
|
||||
// Perform a Query for a particular component, so that this doesn't just get the most recent events
|
||||
// and has to actually hit Lucene.
|
||||
final Query query2 = new Query(UUID.randomUUID().toString());
|
||||
query2.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
|
||||
query2.setMaxResults(100);
|
||||
final QueryResult result2 = repo.queryEvents(query2);
|
||||
assertEquals(10, result2.getMatchingEvents().size());
|
||||
|
||||
final List<ProvenanceEventRecord> matchingEvents2 = result2.getMatchingEvents();
|
||||
timestamp = matchingEvents2.get(0).getEventTime();
|
||||
|
||||
for (final ProvenanceEventRecord record : matchingEvents2) {
|
||||
assertTrue(record.getEventTime() <= timestamp);
|
||||
timestamp = record.getEventTime();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEventsAreOrderedAcrossMultipleIndexes() throws IOException, InterruptedException, ParseException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
config.setDesiredIndexSize(1L);
|
||||
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
repo.initialize(getEventReporter());
|
||||
|
||||
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("abc", "xyz");
|
||||
attributes.put("xyz", "abc");
|
||||
attributes.put("filename", "file-" + uuid);
|
||||
|
||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||
builder.setTransitUri("nifi://unit-test");
|
||||
attributes.put("uuid", uuid);
|
||||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||
builder.setComponentId("1234");
|
||||
builder.setComponentType("dummy processor");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
repo.registerEvent(builder.build());
|
||||
Thread.sleep(20);
|
||||
}
|
||||
|
||||
// Give time for rollover to happen
|
||||
repo.waitForRollover();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
repo.registerEvent(builder.build());
|
||||
Thread.sleep(20);
|
||||
}
|
||||
|
||||
repo.waitForRollover();
|
||||
|
||||
// Verify that multiple indexes exist
|
||||
final File storageDir = config.getStorageDirectories().get(0);
|
||||
final File[] subDirs = storageDir.listFiles(new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(final File dir, final String name) {
|
||||
return name.startsWith("index-");
|
||||
}
|
||||
});
|
||||
assertEquals(2, subDirs.length);
|
||||
|
||||
// Perform a Query for a particular component, so that this doesn't just get the most recent events
|
||||
// and has to actually hit Lucene.
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
|
||||
query.setMaxResults(100);
|
||||
final QueryResult result = repo.queryEvents(query);
|
||||
assertEquals(20, result.getMatchingEvents().size());
|
||||
|
||||
final List<ProvenanceEventRecord> matchingEvents = result.getMatchingEvents();
|
||||
long timestamp = matchingEvents.get(0).getEventTime();
|
||||
|
||||
for (final ProvenanceEventRecord record : matchingEvents) {
|
||||
assertTrue(record.getEventTime() <= timestamp);
|
||||
timestamp = record.getEventTime();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
|
|
|
@ -526,7 +526,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
|||
|
||||
}, IterationDirection.BACKWARD);
|
||||
|
||||
submission.getResult().update(matchingRecords, matchingCount.get(), 0);
|
||||
submission.getResult().update(matchingRecords, matchingCount.get());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue