NIFI-6182: Updated dependency on Lucene to Lucene 8.0.0. Updated code necessary to use the new API. Updated WriteAheadProvenanceRepository so that upon startup if provenance indexes are written using the old Lucene format, , they are considered 'defunct' and the events are re-indexed in a background thread into a new index that uses the Lucene 8 format and the old index is then removed. Added Provenance Repository that consists of about 30 events and added integration test to ensure that the repo can be started up/initialized when pointing to a prov repo that was written using the old lucene format.

This commit is contained in:
Mark Payne 2019-04-02 14:54:17 -04:00
parent 1975101292
commit f15332ff87
46 changed files with 1297 additions and 1348 deletions

View File

@ -16,9 +16,17 @@
*/
package org.apache.nifi.provenance;
import org.apache.nifi.provenance.lineage.ComputeLineageResult;
import org.apache.nifi.provenance.lineage.EdgeNode;
import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.FlowFileNode;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@ -31,16 +39,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.nifi.provenance.lineage.ComputeLineageResult;
import org.apache.nifi.provenance.lineage.EdgeNode;
import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.FlowFileNode;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
/**
*
*/
@ -50,7 +48,7 @@ public class StandardLineageResult implements ComputeLineageResult, ProgressiveR
private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class);
private final Collection<String> flowFileUuids;
private final Collection<ProvenanceEventRecord> relevantRecords = new ArrayList<>();
private final Set<ProvenanceEventRecord> relevantRecords = new HashSet<>();
private final Set<LineageNode> nodes = new HashSet<>();
private final Set<LineageEdge> edges = new HashSet<>();
private final int numSteps;
@ -96,24 +94,6 @@ public class StandardLineageResult implements ComputeLineageResult, ProgressiveR
}
}
public int getNumberOfEdges() {
readLock.lock();
try {
return edges.size();
} finally {
readLock.unlock();
}
}
public int getNumberOfNodes() {
readLock.lock();
try {
return nodes.size();
} finally {
readLock.unlock();
}
}
public long getComputationTime(final TimeUnit timeUnit) {
readLock.lock();
try {
@ -224,7 +204,7 @@ public class StandardLineageResult implements ComputeLineageResult, ProgressiveR
Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile
final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords);
Collections.sort(sortedRecords, new Comparator<ProvenanceEventRecord>() {
sortedRecords.sort(new Comparator<ProvenanceEventRecord>() {
@Override
public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
// Sort on Event Time, then Event ID.

View File

@ -73,4 +73,23 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/lucene-4-prov-repo/0.prov</exclude>
<exclude>src/test/resources/lucene-4-prov-repo/toc/0.toc</exclude>
<exclude>src/test/resources/lucene-4-prov-repo/index-1554304717707/_0.fdt</exclude>
<exclude>src/test/resources/lucene-4-prov-repo/index-1554304717707/_0.fdx</exclude>
<exclude>src/test/resources/lucene-4-prov-repo/index-1554304717707/_1.fdt</exclude>
<exclude>src/test/resources/lucene-4-prov-repo/index-1554304717707/_1.fdx</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -16,12 +16,19 @@
*/
package org.apache.nifi.provenance;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -34,12 +41,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
@ -47,7 +48,7 @@ public class IndexConfiguration {
private final RepositoryConfiguration repoConfig;
private final Map<File, List<File>> indexDirectoryMap = new HashMap<>();
private final Pattern indexNamePattern = Pattern.compile("index-(\\d+)");
private final Pattern indexNamePattern = DirectoryUtils.INDEX_DIRECTORY_NAME_PATTERN;
private final Lock lock = new ReentrantLock();
private static final Logger logger = LoggerFactory.getLogger(IndexConfiguration.class);
@ -73,9 +74,7 @@ public class IndexConfiguration {
});
if (matching != null) {
for (final File matchingFile : matching) {
indexDirectories.add(matchingFile);
}
indexDirectories.addAll(Arrays.asList(matching));
}
indexDirectoryMap.put(storageDirectory, indexDirectories);
@ -171,7 +170,7 @@ public class IndexConfiguration {
if (firstEntryTime == null) {
firstEntryTime = newIndexTimestamp;
}
return new File(storageDirectory, "index-" + firstEntryTime);
return new File(storageDirectory, "lucene-8-index-" + firstEntryTime);
}
public List<File> getIndexDirectories() {

View File

@ -49,7 +49,7 @@ import org.apache.nifi.provenance.lucene.IndexSearch;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.lucene.LineageQuery;
import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.lucene.SimpleIndexManager;
import org.apache.nifi.provenance.lucene.StandardIndexManager;
import org.apache.nifi.provenance.lucene.UpdateMinimumEventId;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
@ -135,8 +135,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
private static final String TEMP_FILE_SUFFIX = ".prov.part";
private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
public static final Pattern INDEX_PATTERN = Pattern.compile("(?:lucene-\\d+-)?index-\\d+");
public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file
public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5;
@ -247,7 +246,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
this.indexConfig = new IndexConfiguration(configuration);
this.indexManager = new SimpleIndexManager(configuration);
this.indexManager = new StandardIndexManager(configuration);
this.alwaysSync = configuration.isAlwaysSync();
this.rolloverCheckMillis = rolloverCheckMillis;
@ -1193,7 +1192,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
*/
private long getIndexTimestamp(final File indexDirectory) {
final String name = indexDirectory.getName();
final int dashIndex = name.indexOf("-");
final int dashIndex = name.lastIndexOf("-");
return Long.parseLong(name.substring(dashIndex + 1));
}
@ -2080,13 +2079,13 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
public List<Document> call() {
final List<Document> localScoreDocs = new ArrayList<>();
try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) {
try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory.toPath()))) {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
final TopDocs topDocs = searcher.search(luceneQuery, 10000000);
logger.info("For {}, Top Docs has {} hits; reading Lucene results", indexDirectory, topDocs.scoreDocs.length);
if (topDocs.totalHits > 0) {
if (topDocs.totalHits.value > 0) {
for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc;
final Document d = directoryReader.document(docId);

View File

@ -28,7 +28,7 @@ import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.index.lucene.LuceneEventIndex;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.SimpleIndexManager;
import org.apache.nifi.provenance.lucene.StandardIndexManager;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
@ -144,7 +144,7 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
eventStore = new PartitionedWriteAheadEventStore(config, recordWriterFactory, recordReaderFactory, eventReporter, fileManager);
final IndexManager indexManager = new SimpleIndexManager(config);
final IndexManager indexManager = new StandardIndexManager(config);
eventIndex = new LuceneEventIndex(config, indexManager, eventReporter);
this.eventReporter = eventReporter;
@ -154,11 +154,15 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
eventStore.initialize();
eventIndex.initialize(eventStore);
try {
eventStore.reindexLatestEvents(eventIndex);
} catch (final Exception e) {
logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest "
if (eventIndex.isReindexNecessary()) {
try {
eventStore.reindexLatestEvents(eventIndex);
} catch (final Exception e) {
logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest "
+ "events will not be available from the Provenance Repository when a query is issued.", e);
}
} else {
logger.info("Provenance Event Index indicates that no events should be re-indexed upon startup. Will not wait for re-indexing to occur.");
}
}

View File

@ -17,10 +17,6 @@
package org.apache.nifi.provenance.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
@ -30,6 +26,10 @@ import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.EventStore;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
/**
* An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly
* searched to in order to retrieve events of interest.
@ -51,6 +51,13 @@ public interface EventIndex extends Closeable {
*/
void addEvents(Map<ProvenanceEventRecord, StorageSummary> events);
/**
* Indicates whether or not events that are not known to the index should be re-indexed (via {@link #reindexEvents(Map)}}) upon startup.
*
* @return <code>true</code> if unknown events should be re-indexed, <code>false</code> otherwise.
*/
boolean isReindexNecessary();
/**
* Replaces the entries in the appropriate index with the given events
*

View File

@ -16,18 +16,14 @@
*/
package org.apache.nifi.provenance.index.lucene;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.FieldInfo.IndexOptions;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexOptions;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -36,6 +32,11 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.StorageSummary;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ConvertEventToLuceneDocument {
private final Set<SearchableField> searchableEventFields;
private final Set<SearchableField> searchableAttributeFields;
@ -55,6 +56,10 @@ public class ConvertEventToLuceneDocument {
public Document convert(final ProvenanceEventRecord record, final StorageSummary persistedEvent) {
return convert(record, persistedEvent.getEventId());
}
public Document convert(final ProvenanceEventRecord record, final long eventId) {
final Document doc = new Document();
addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid());
addField(doc, SearchableFields.Filename, record.getAttribute(CoreAttributes.FILENAME.key()));
@ -74,63 +79,62 @@ public class ConvertEventToLuceneDocument {
}
// Index the fields that we always index (unless there's nothing else to index at all)
if (!doc.getFields().isEmpty()) {
// Always include Lineage Start Date because it allows us to make our Lineage queries more efficient.
doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
// Always include Event Time because most queries are bound by a start and end time.
doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
// We always include File Size because the UI wants to always render the controls for specifying this. This idea could be revisited.
doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
// We always store the event Event ID in the Document but do not index it. It doesn't make sense to query based on Event ID because
// if we want a particular Event ID, we can just obtain it directly from the EventStore. But when we obtain a Document, this info must
// be stored so that we know how to lookup the event in the store.
doc.add(new UnIndexedLongField(SearchableFields.Identifier.getSearchableFieldName(), persistedEvent.getEventId()));
// If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
final ProvenanceEventType eventType = record.getEventType();
if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
for (final String uuid : record.getChildUuids()) {
if (!uuid.equals(record.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid);
}
}
} else if (eventType == ProvenanceEventType.JOIN) {
for (final String uuid : record.getParentUuids()) {
if (!uuid.equals(record.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid);
}
}
} else if (eventType == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
// If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
// that the Source System uses to refer to the data.
final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
final String sourceFlowFileUUID;
final int lastColon = sourceIdentifier.lastIndexOf(":");
if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
} else {
sourceFlowFileUUID = null;
}
if (sourceFlowFileUUID != null) {
addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID);
}
}
return doc;
if (doc.getFields().isEmpty()) {
return null;
}
return null;
// Always include Lineage Start Date because it allows us to make our Lineage queries more efficient.
doc.add(new LongPoint(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate()));
// Always include Event Time because most queries are bound by a start and end time.
doc.add(new LongPoint(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime()));
// We always include File Size because the UI wants to always render the controls for specifying this. This idea could be revisited.
doc.add(new LongPoint(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize()));
// We always store the event Event ID in the Document but do not index it. It doesn't make sense to query based on Event ID because
// if we want a particular Event ID, we can just obtain it directly from the EventStore. But when we obtain a Document, this info must
// be stored so that we know how to lookup the event in the store.
doc.add(new UnIndexedLongField(SearchableFields.Identifier.getSearchableFieldName(), eventId));
// If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
final ProvenanceEventType eventType = record.getEventType();
if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
for (final String uuid : record.getChildUuids()) {
if (!uuid.equals(record.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid);
}
}
} else if (eventType == ProvenanceEventType.JOIN) {
for (final String uuid : record.getParentUuids()) {
if (!uuid.equals(record.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid);
}
}
} else if (eventType == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
// If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
// that the Source System uses to refer to the data.
final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
final String sourceFlowFileUUID;
final int lastColon = sourceIdentifier.lastIndexOf(":");
if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
} else {
sourceFlowFileUUID = null;
}
if (sourceFlowFileUUID != null) {
addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID);
}
}
return doc;
}
private static class UnIndexedLongField extends Field {
static final FieldType TYPE = new FieldType();
static {
TYPE.setIndexed(false);
TYPE.setIndexOptions(IndexOptions.NONE);
TYPE.setTokenized(true);
TYPE.setOmitNorms(true);
TYPE.setIndexOptions(IndexOptions.DOCS_ONLY);
TYPE.setNumericType(FieldType.NumericType.LONG);
TYPE.setDocValuesType(DocValuesType.NUMERIC);
TYPE.setStored(true);
TYPE.freeze();
}

View File

@ -18,9 +18,9 @@
package org.apache.nifi.provenance.index.lucene;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.provenance.lucene.IndexManager;
@ -56,7 +56,7 @@ public class EventIndexTask implements Runnable {
private volatile CompletableFuture<Void> shutdownComplete;
public EventIndexTask(final BlockingQueue<StoredDocument> documentQueue, final RepositoryConfiguration repoConfig, final IndexManager indexManager,
public EventIndexTask(final BlockingQueue<StoredDocument> documentQueue, final IndexManager indexManager,
final IndexDirectoryManager directoryManager, final int maxEventsPerCommit, final EventReporter eventReporter) {
this.documentQueue = documentQueue;
this.indexManager = indexManager;
@ -154,8 +154,7 @@ public class EventIndexTask implements Runnable {
}
}
final NumericRangeQuery<Long> query = NumericRangeQuery.newLongRange(
SearchableFields.Identifier.getSearchableFieldName(), minId, maxId, true, true);
final Query query = LongPoint.newRangeQuery(SearchableFields.Identifier.getSearchableFieldName(), minId, maxId);
indexWriter.getIndexWriter().deleteDocuments(query);
final List<Document> documents = documentsForIndex.stream()
@ -177,7 +176,7 @@ public class EventIndexTask implements Runnable {
// Convert the IndexableDocument list into a List of Documents so that we can pass them to the Index Writer.
final List<Document> documents = toIndex.stream()
.map(doc -> doc.getDocument())
.map(StoredDocument::getDocument)
.collect(Collectors.toList());
boolean requestClose = false;

View File

@ -17,14 +17,27 @@
package org.apache.nifi.provenance.index.lucene;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
@ -33,16 +46,17 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexDirectoryManager {
private static final Logger logger = LoggerFactory.getLogger(IndexDirectoryManager.class);
private static final FileFilter INDEX_DIRECTORY_FILTER = f -> f.getName().startsWith("index-");
private static final Pattern INDEX_FILENAME_PATTERN = Pattern.compile("index-(\\d+)");
private static final Pattern LUCENE_8_AND_LATER_INDEX_PATTERN = Pattern.compile("lucene-\\d+-index-(.*)");
private static final FileFilter LUCENE_8_AND_LATER_INDEX_DIRECTORY_FILTER = f -> LUCENE_8_AND_LATER_INDEX_PATTERN.matcher(f.getName()).matches();
private static final Pattern INDEX_FILENAME_PATTERN = DirectoryUtils.INDEX_DIRECTORY_NAME_PATTERN;
private static final FileFilter ALL_INDEX_FILE_FILTER = f -> INDEX_FILENAME_PATTERN.matcher(f.getName()).matches();
private static final Pattern LUCENE_4_INDEX_PATTERN = Pattern.compile("index-(.*)");
private static final FileFilter LUCENE_4_INDEX_FILE_FILTER = f -> LUCENE_4_INDEX_PATTERN.matcher(f.getName()).matches();
private final RepositoryConfiguration repoConfig;
@ -61,7 +75,7 @@ public class IndexDirectoryManager {
final String partitionName = entry.getKey();
final File storageDir = entry.getValue();
final File[] indexDirs = storageDir.listFiles(INDEX_DIRECTORY_FILTER);
final File[] indexDirs = storageDir.listFiles(LUCENE_8_AND_LATER_INDEX_DIRECTORY_FILTER);
if (indexDirs == null) {
logger.warn("Unable to access Provenance Repository storage directory {}", storageDir);
continue;
@ -88,12 +102,22 @@ public class IndexDirectoryManager {
// Restore the activeIndices to point at the newest index in each storage location.
for (final Tuple<Long, IndexLocation> tuple : latestIndexByStorageDir.values()) {
final IndexLocation indexLoc = tuple.getValue();
activeIndices.put(indexLoc.getPartitionName(), indexLoc);
final File indexDir = indexLoc.getIndexDirectory();
if (indexDir.exists()) {
try (final Directory directory = FSDirectory.open(indexDir.toPath());
@SuppressWarnings("unused") final DirectoryReader reader = DirectoryReader.open(directory)) {
activeIndices.put(indexLoc.getPartitionName(), indexLoc);
} catch (final IOException ioe) {
logger.debug("Unable to open Lucene Index located at {} so assuming that it is defunct and will not use as the active index", indexDir, ioe);
}
}
}
}
public synchronized void deleteDirectory(final File directory) {
public synchronized void removeDirectory(final File directory) {
final Iterator<Map.Entry<Long, List<IndexLocation>>> itr = indexLocationByTimestamp.entrySet().iterator();
while (itr.hasNext()) {
final Map.Entry<Long, List<IndexLocation>> entry = itr.next();
@ -107,6 +131,33 @@ public class IndexDirectoryManager {
}
}
public synchronized List<File> getAllIndexDirectories(final boolean includeLucene4Directories, final boolean includeLaterLuceneDirectories) {
final List<File> allDirectories = new ArrayList<>();
final FileFilter directoryFilter;
if (includeLucene4Directories && includeLaterLuceneDirectories) {
directoryFilter = ALL_INDEX_FILE_FILTER;
} else if (includeLucene4Directories) {
directoryFilter = LUCENE_4_INDEX_FILE_FILTER;
} else if (includeLaterLuceneDirectories) {
directoryFilter = LUCENE_8_AND_LATER_INDEX_DIRECTORY_FILTER;
} else {
throw new IllegalArgumentException("Cannot list all directoreis but excluded Lucene 4 directories and later directories");
}
for (final File storageDir : repoConfig.getStorageDirectories().values()) {
final File[] indexDirs = storageDir.listFiles(directoryFilter);
if (indexDirs == null) {
logger.warn("Unable to access Provenance Repository storage directory {}", storageDir);
continue;
}
allDirectories.addAll(Arrays.asList(indexDirs));
}
return allDirectories;
}
/**
* Returns a List of all indexes where the latest event in the index has an event time before the given timestamp
*
@ -118,8 +169,8 @@ public class IndexDirectoryManager {
// An index cannot be expired if it is the latest index in the storage directory. As a result, we need to
// separate the indexes by Storage Directory so that we can easily determine if this is the case.
final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream()
.collect(Collectors.groupingBy(indexLoc -> indexLoc.getPartitionName()));
final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp(true).stream()
.collect(Collectors.groupingBy(IndexLocation::getPartitionName));
// Scan through the index directories and the associated index event start time.
// If looking at index N, we can determine the index end time by assuming that it is the same as the
@ -135,7 +186,7 @@ public class IndexDirectoryManager {
continue;
}
final Long indexStartTime = indexLoc.getIndexStartTimestamp();
final long indexStartTime = indexLoc.getIndexStartTimestamp();
if (indexStartTime > timestamp) {
// If the first timestamp in the index is later than the desired timestamp,
// then we are done. We can do this because the list is ordered by monotonically
@ -166,11 +217,17 @@ public class IndexDirectoryManager {
*
* @return a List of all IndexLocations known
*/
private List<IndexLocation> flattenDirectoriesByTimestamp() {
private List<IndexLocation> flattenDirectoriesByTimestamp(final boolean includeOldIndices) {
final List<IndexLocation> startTimeWithFile = new ArrayList<>();
for (final Map.Entry<Long, List<IndexLocation>> entry : indexLocationByTimestamp.entrySet()) {
for (final IndexLocation indexLoc : entry.getValue()) {
startTimeWithFile.add(indexLoc);
if (includeOldIndices) {
startTimeWithFile.addAll(entry.getValue());
} else {
for (final IndexLocation location : entry.getValue()) {
if (location.getIndexDirectory().getName().startsWith("lucene-")) {
startTimeWithFile.add(location);
}
}
}
}
@ -178,12 +235,16 @@ public class IndexDirectoryManager {
}
public synchronized List<File> getDirectories(final Long startTime, final Long endTime) {
return getDirectories(startTime, endTime, true);
}
public synchronized List<File> getDirectories(final Long startTime, final Long endTime, final boolean includeOldIndices) {
final List<File> selected = new ArrayList<>();
// An index cannot be expired if it is the latest index in the partition. As a result, we need to
// separate the indexes by partition so that we can easily determine if this is the case.
final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream()
.collect(Collectors.groupingBy(indexLoc -> indexLoc.getPartitionName()));
final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp(includeOldIndices).stream()
.collect(Collectors.groupingBy(IndexLocation::getPartitionName));
for (final List<IndexLocation> locationList : startTimeWithFileByStorageDirectory.values()) {
selected.addAll(getDirectories(startTime, endTime, locationList));
@ -195,8 +256,8 @@ public class IndexDirectoryManager {
public synchronized List<File> getDirectories(final Long startTime, final Long endTime, final String partitionName) {
// An index cannot be expired if it is the latest index in the partition. As a result, we need to
// separate the indexes by partition so that we can easily determine if this is the case.
final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream()
.collect(Collectors.groupingBy(indexLoc -> indexLoc.getPartitionName()));
final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp(true).stream()
.collect(Collectors.groupingBy(IndexLocation::getPartitionName));
final List<IndexLocation> indexLocations = startTimeWithFileByStorageDirectory.get(partitionName);
if (indexLocations == null) {
@ -296,6 +357,7 @@ public class IndexDirectoryManager {
return Optional.of(indexLocation.getIndexDirectory());
}
private long getSize(final File indexDir) {
if (!indexDir.exists()) {
return 0L;
@ -347,8 +409,85 @@ public class IndexDirectoryManager {
.map(Map.Entry::getValue)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Invalid Partition: " + partitionName));
final File indexDir = new File(storageDir, "index-" + earliestTimestamp);
final File indexDir = new File(storageDir, "lucene-8-index-" + earliestTimestamp);
return indexDir;
}
public void replaceDirectory(final File oldIndexDir, final File newIndexDir, final boolean destroyOldIndex) {
boolean replaced = false;
synchronized (this) {
for (final Map.Entry<Long, List<IndexLocation>> entry : indexLocationByTimestamp.entrySet()) {
final List<IndexLocation> locations = entry.getValue();
final ListIterator<IndexLocation> itr = locations.listIterator();
while (itr.hasNext()) {
final IndexLocation location = itr.next();
if (location.getIndexDirectory().equals(oldIndexDir)) {
final IndexLocation updatedLocation = new IndexLocation(newIndexDir, location.getIndexStartTimestamp(), location.getPartitionName());
itr.set(updatedLocation);
replaced = true;
logger.debug("Replaced {} with {}", location, updatedLocation);
}
}
}
}
if (!replaced) {
insertIndexDirectory(newIndexDir);
}
if (destroyOldIndex) {
try {
FileUtils.deleteFile(oldIndexDir, true);
} catch (IOException e) {
logger.warn("Failed to delete index directory {}; this directory should be cleaned up manually", oldIndexDir, e);
}
}
removeDirectory(oldIndexDir);
logger.info("Successfully replaced old index directory {} with new index directory {}", oldIndexDir, newIndexDir);
}
private void insertIndexDirectory(final File indexDirectory) {
// We didn't find the old index directory. Just add the new index directory.
final long timestamp = DirectoryUtils.getIndexTimestamp(indexDirectory);
if (timestamp < 0) {
logger.debug("Attempted to replace old index directory {} with new index directory {} but the old index directory did not " +
"exist and could not determine timestamp for new index directory");
} else {
final String partitionName = getPartitionName(indexDirectory);
if (partitionName == null) {
logger.debug("Attempted to replace old index directory {} with new index directory {} but the old index directory did not " +
"exist and could not determine partition name for new index directory");
} else {
final IndexLocation indexLocation = new IndexLocation(indexDirectory, timestamp, partitionName);
indexLocationByTimestamp.computeIfAbsent(timestamp, key -> new ArrayList<>()).add(indexLocation);
logger.debug("Successfully inserted new index directory {}", indexDirectory);
}
}
}
private String getPartitionName(final File indexDir) {
for (final Map.Entry<String, File> entry : repoConfig.getStorageDirectories().entrySet()) {
final File storageDir = entry.getValue();
if (isParent(indexDir, storageDir)) {
return entry.getKey();
}
}
return null;
}
private boolean isParent(final File file, final File potentialParent) {
final File parentFile = file.getParentFile();
if (parentFile != null && parentFile.equals(potentialParent)) {
return true;
}
return isParent(parentFile, potentialParent);
}
}

View File

@ -20,7 +20,7 @@ package org.apache.nifi.provenance.index.lucene;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.nifi.authorization.AccessDeniedException;
@ -49,6 +49,7 @@ import org.apache.nifi.provenance.store.EventStore;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.provenance.util.NamedThreadFactory;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
@ -75,9 +76,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class LuceneEventIndex implements EventIndex {
private static final Logger logger = LoggerFactory.getLogger(LuceneEventIndex.class);
private static final String EVENT_CATEGORY = "Provenance Repository";
@ -86,6 +87,7 @@ public class LuceneEventIndex implements EventIndex {
public static final int MAX_DELETE_INDEX_WAIT_SECONDS = 30;
public static final int MAX_LINEAGE_NODES = 1000;
public static final int MAX_INDEX_THREADS = 100;
public static final int MAX_LINEAGE_UUIDS = 100;
private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
@ -109,6 +111,7 @@ public class LuceneEventIndex implements EventIndex {
private ScheduledExecutorService maintenanceExecutor; // effectively final
private ScheduledExecutorService cacheWarmerExecutor;
private EventStore eventStore;
private volatile boolean newestIndexDefunct = false;
public LuceneEventIndex(final RepositoryConfiguration config, final IndexManager indexManager, final EventReporter eventReporter) {
this(config, indexManager, EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter);
@ -140,10 +143,11 @@ public class LuceneEventIndex implements EventIndex {
}
for (int i = 0; i < numIndexThreads; i++) {
final EventIndexTask task = new EventIndexTask(documentQueue, config, indexManager, directoryManager, maxEventsPerCommit, eventReporter);
final EventIndexTask task = new EventIndexTask(documentQueue, indexManager, directoryManager, maxEventsPerCommit, eventReporter);
indexTasks.add(task);
indexExecutor.submit(task);
}
this.config = config;
this.indexManager = indexManager;
this.eventConverter = new ConvertEventToLuceneDocument(config.getSearchableFields(), config.getSearchableAttributes());
@ -155,12 +159,76 @@ public class LuceneEventIndex implements EventIndex {
directoryManager.initialize();
maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES);
maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, 1, 1, TimeUnit.MINUTES);
maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 30, 30, TimeUnit.SECONDS);
cachedQueries.add(new LatestEventsQuery());
cachedQueries.add(new LatestEventsPerProcessorQuery());
triggerReindexOfDefunctIndices();
triggerCacheWarming();
}
private void triggerReindexOfDefunctIndices() {
final ExecutorService rebuildIndexExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Rebuild Defunct Provenance Indices", true));
final List<File> allIndexDirectories = directoryManager.getAllIndexDirectories(true, true);
allIndexDirectories.sort(DirectoryUtils.OLDEST_INDEX_FIRST);
final List<File> defunctIndices = detectDefunctIndices(allIndexDirectories);
final AtomicInteger rebuildCount = new AtomicInteger(0);
final int totalCount = defunctIndices.size();
for (final File defunctIndex : defunctIndices) {
try {
if (isLucene4IndexPresent(defunctIndex)) {
logger.info("Encountered Lucene 8 index {} and also the corresponding Lucene 4 index; will only trigger rebuilding of one directory.", defunctIndex);
rebuildCount.incrementAndGet();
continue;
}
logger.info("Determined that Lucene Index Directory {} is defunct. Will destroy and rebuild index", defunctIndex);
final Tuple<Long, Long> timeRange = getTimeRange(defunctIndex, allIndexDirectories);
rebuildIndexExecutor.submit(new MigrateDefunctIndex(defunctIndex, indexManager, directoryManager, timeRange.getKey(), timeRange.getValue(),
eventStore, eventReporter, eventConverter, rebuildCount, totalCount));
} catch (final Exception e) {
logger.error("Detected defunct index {} but failed to rebuild index", defunctIndex, e);
}
}
rebuildIndexExecutor.shutdown();
if (!allIndexDirectories.isEmpty()) {
final File newestIndexDirectory = allIndexDirectories.get(allIndexDirectories.size() - 1);
if (defunctIndices.contains(newestIndexDirectory)) {
newestIndexDefunct = true;
}
}
}
/**
* Returns true if the given Index Directory appears to be a later version of the Lucene Index and there also exists a version 4 Lucene
* Index for the same timestamp
* @param indexDirectory the index directory to check
* @return <code>true</code> if there exists a Lucene 4 index directory for the same timestamp, <code>false</code> otherwise
*/
private boolean isLucene4IndexPresent(final File indexDirectory) {
final String indexName = indexDirectory.getName();
if (indexName.contains("lucene-8-")) {
final int prefixEnd = indexName.indexOf("index-");
final String oldIndexName = indexName.substring(prefixEnd);
final File oldIndexFile = new File(indexDirectory.getParentFile(), oldIndexName);
final boolean oldIndexExists = oldIndexFile.exists();
if (oldIndexExists) {
return true;
}
}
return false;
}
private void triggerCacheWarming() {
final Optional<Integer> warmCacheMinutesOption = config.getWarmCacheFrequencyMinutes();
if (warmCacheMinutesOption.isPresent() && warmCacheMinutesOption.get() > 0) {
for (final File storageDir : config.getStorageDirectories().values()) {
@ -170,6 +238,72 @@ public class LuceneEventIndex implements EventIndex {
}
}
/**
* Takes a list of index directories sorted from the earliest timestamp to the latest, and determines the time range of the given index directory based on that.
* @param indexDirectory the index directory whose time range is desired
* @param sortedIndexDirectories the list of all index directories from the earliest timestamp to the latest
* @return a Tuple whose LHS is the earliest timestamp and RHS is the latest timestamp that the given index directory encompasses
*/
protected static Tuple<Long, Long> getTimeRange(final File indexDirectory, final List<File> sortedIndexDirectories) {
final long startTimestamp = DirectoryUtils.getIndexTimestamp(indexDirectory);
// If no index directories, assume that the time range extends from the start time until now.
if (sortedIndexDirectories.isEmpty()) {
return new Tuple<>(startTimestamp, System.currentTimeMillis());
}
final int index = sortedIndexDirectories.indexOf(indexDirectory);
if (index < 0) {
// Index is not in our set of indices.
final long firstIndexTimestamp = DirectoryUtils.getIndexTimestamp(sortedIndexDirectories.get(0));
// If the index comes before our first index, use the time range from when the index starts to the time when the first in the list starts.
if (startTimestamp < firstIndexTimestamp) {
return new Tuple<>(startTimestamp, firstIndexTimestamp);
}
// Otherwise, assume time range from when the index starts until now.
return new Tuple<>(startTimestamp, System.currentTimeMillis());
}
// IF there's no index that comes after this one, use current time as the end of the time range.
if (index + 1 > sortedIndexDirectories.size() - 1) {
return new Tuple<>(startTimestamp, System.currentTimeMillis());
}
final File upperBoundIndexDir = sortedIndexDirectories.get(index + 1);
final long endTimestamp = DirectoryUtils.getIndexTimestamp(upperBoundIndexDir);
return new Tuple<>(startTimestamp, endTimestamp);
}
private List<File> detectDefunctIndices(final Collection<File> indexDirectories) {
final List<File> defunct = new ArrayList<>();
for (final File indexDir : indexDirectories) {
if (isIndexDefunct(indexDir)) {
defunct.add(indexDir);
}
}
return defunct;
}
private boolean isIndexDefunct(final File indexDir) {
EventIndexSearcher indexSearcher = null;
try {
indexSearcher = indexManager.borrowIndexSearcher(indexDir);
} catch (final IOException ioe) {
logger.warn("Lucene Index {} could not be opened. Assuming that index is defunct and will re-index events belonging to this index.", indexDir);
return true;
} finally {
if (indexSearcher != null) {
indexManager.returnIndexSearcher(indexSearcher);
}
}
return false;
}
@Override
public long getMinimumEventIdToReindex(final String partitionName) {
return Math.max(0, getMaxEventId(partitionName) - EventIndexTask.MAX_DOCUMENTS_PER_THREAD * LuceneEventIndex.MAX_INDEX_THREADS);
@ -213,7 +347,7 @@ public class LuceneEventIndex implements EventIndex {
return -1L;
}
Collections.sort(allDirectories, DirectoryUtils.NEWEST_INDEX_FIRST);
allDirectories.sort(DirectoryUtils.NEWEST_INDEX_FIRST);
for (final File directory : allDirectories) {
final EventIndexSearcher searcher;
@ -241,9 +375,20 @@ public class LuceneEventIndex implements EventIndex {
return -1L;
}
public boolean isReindexNecessary() {
// If newest index is defunct, there's no reason to re-index, as it will happen in the background thread
logger.info("Will avoid re-indexing Provenance Events because the newest index is defunct, so it will be re-indexed in the background");
return !newestIndexDefunct;
}
@Override
public void reindexEvents(final Map<ProvenanceEventRecord, StorageSummary> events) {
final EventIndexTask indexTask = new EventIndexTask(documentQueue, config, indexManager, directoryManager, EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter);
if (newestIndexDefunct) {
logger.info("Will avoid re-indexing {} events because the newest index is defunct, so it will be re-indexed in the background", events.size());
return;
}
final EventIndexTask indexTask = new EventIndexTask(documentQueue, indexManager, directoryManager, EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter);
File lastIndexDir = null;
long lastEventTime = -2L;
@ -265,8 +410,14 @@ public class LuceneEventIndex implements EventIndex {
if (event.getEventTime() == lastEventTime) {
indexDir = lastIndexDir;
} else {
final List<File> files = getDirectoryManager().getDirectories(event.getEventTime(), null);
indexDir = files.isEmpty() ? null : files.get(0);
final List<File> files = getDirectoryManager().getDirectories(event.getEventTime(), null, false);
if (files.isEmpty()) {
final String partitionName = summary.getPartitionName().get();
indexDir = getDirectoryManager().getWritableIndexingDirectory(event.getEventTime(), partitionName);
} else {
indexDir = files.get(0);
}
lastIndexDir = indexDir;
}
@ -391,6 +542,10 @@ public class LuceneEventIndex implements EventIndex {
private ComputeLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final NiFiUser user, final EventAuthorizer eventAuthorizer,
final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
if (flowFileUuids.size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
}
final List<File> indexDirs = directoryManager.getDirectories(startTimestamp, endTimestamp);
final AsyncLineageSubmission submission = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size(), user == null ? null : user.getIdentity());
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
@ -400,7 +555,7 @@ public class LuceneEventIndex implements EventIndex {
if (indexDirectories.isEmpty()) {
submission.getResult().update(Collections.emptyList(), 0L);
} else {
Collections.sort(indexDirectories, DirectoryUtils.OLDEST_INDEX_FIRST);
indexDirectories.sort(DirectoryUtils.OLDEST_INDEX_FIRST);
for (final File indexDir : indexDirectories) {
queryExecutor.submit(new QueryTask(lineageQuery, submission.getResult(), MAX_LINEAGE_NODES, indexManager, indexDir,
@ -427,11 +582,13 @@ public class LuceneEventIndex implements EventIndex {
if (flowFileUuids == null || flowFileUuids.isEmpty()) {
lineageQuery = null;
} else {
lineageQuery = new BooleanQuery();
final BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
for (final String flowFileUuid : flowFileUuids) {
lineageQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD);
final TermQuery termQuery = new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid));
queryBuilder.add(new BooleanClause(termQuery, BooleanClause.Occur.SHOULD));
}
lineageQuery.setMinimumNumberShouldMatch(1);
lineageQuery = queryBuilder.build();
}
return lineageQuery;
@ -487,7 +644,7 @@ public class LuceneEventIndex implements EventIndex {
if (indexDirectories.isEmpty()) {
submission.getResult().update(Collections.emptyList(), 0L);
} else {
Collections.sort(indexDirectories, DirectoryUtils.NEWEST_INDEX_FIRST);
indexDirectories.sort(DirectoryUtils.NEWEST_INDEX_FIRST);
for (final File indexDir : indexDirectories) {
queryExecutor.submit(new QueryTask(luceneQuery, submission.getResult(), query.getMaxResults(), indexManager, indexDir,
@ -713,7 +870,7 @@ public class LuceneEventIndex implements EventIndex {
+ "However, the directory could not be deleted.", e);
}
directoryManager.deleteDirectory(indexDirectory);
directoryManager.removeDirectory(indexDirectory);
logger.info("Successfully removed expired Lucene Index {}", indexDirectory);
} else {
logger.warn("The Lucene Index located at {} has expired and contains no Provenance Events that still exist in the respository. "

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.index.lucene;
import org.apache.lucene.document.Document;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.store.EventStore;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public class MigrateDefunctIndex implements Runnable {
private static final String TEMP_FILENAME_PREFIX = "temp-lucene-8-";
private static final String MIGRATED_FILENAME_PREFIX = "lucene-8-";
private static final Logger logger = LoggerFactory.getLogger(MigrateDefunctIndex.class);
private final File indexDirectory;
private final IndexManager indexManager;
private final IndexDirectoryManager directoryManager;
private final EventStore eventStore;
private final EventReporter eventReporter;
private final ConvertEventToLuceneDocument eventConverter;
private final long minTimestamp;
private final long maxTimestamp;
private final AtomicInteger rebuildCount;
private final int totalCount;
private long successCount = 0L;
public MigrateDefunctIndex(final File indexDirectory, final IndexManager indexManager, final IndexDirectoryManager directoryManager, final long minTimestamp, final long maxTimestamp,
final EventStore eventStore, final EventReporter eventReporter, final ConvertEventToLuceneDocument eventConverter, final AtomicInteger rebuildCount,
final int totalCount) {
this.indexDirectory = indexDirectory;
this.indexManager = indexManager;
this.directoryManager = directoryManager;
this.eventStore = eventStore;
this.eventReporter = eventReporter;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.eventConverter = eventConverter;
this.rebuildCount = rebuildCount;
this.totalCount = totalCount;
}
@Override
public void run() {
final File tempIndexDir = new File(indexDirectory.getParentFile(), TEMP_FILENAME_PREFIX + indexDirectory.getName());
final File migratedIndexDir = new File(indexDirectory.getParentFile(), MIGRATED_FILENAME_PREFIX + indexDirectory.getName());
final boolean preconditionsMet = verifyPreconditions(tempIndexDir, migratedIndexDir);
if (!preconditionsMet) {
rebuildCount.incrementAndGet(); // increment count so that reporting is accurate
return;
}
// Rebuild the directory or report the error
try {
rebuildIndex(tempIndexDir, migratedIndexDir);
directoryManager.replaceDirectory(indexDirectory, migratedIndexDir, true);
logger.info("Successfully rebuilt Lucene Index {} as {}; {} of {} indices remain to be rebuilt", indexDirectory, migratedIndexDir,
totalCount - rebuildCount.incrementAndGet(), totalCount);
} catch (final Exception e) {
logger.error("Failed to migrate event index {} to {} after successfully re-indexing {} events", indexDirectory, tempIndexDir, successCount, e);
eventReporter.reportEvent(Severity.ERROR, "Provenance Event Index Migration", "Failed to migrate event index " + indexDirectory + " - see logs for more details.");
rebuildCount.incrementAndGet(); // increment count so that reporting is accurate
}
}
private boolean verifyPreconditions(final File tempIndexDir, final File migratedIndexDir) {
// If the temp directory exists, delete it or fail.
if (tempIndexDir.exists()) {
try {
FileUtils.deleteFile(tempIndexDir, true);
} catch (final Exception e) {
logger.error("Attempted to rebuild index for {} but there already exists a temporary Lucene 8 index at {}. " +
"Attempted to delete existing temp directory but failed. This index will not be rebuilt.", tempIndexDir, e);
return false;
}
}
// If the migrated directory exists, delete it or fail.
if (migratedIndexDir.exists()) {
try {
FileUtils.deleteFile(migratedIndexDir, true);
} catch (final Exception e) {
logger.error("Attempted to rebuild index for {} but there already exists a Lucene 8 index at {}. " +
"Attempted to delete existing Lucene 8 directory but failed. This index will not be rebuilt.", migratedIndexDir, e);
return false;
}
}
return true;
}
private void rebuildIndex(final File tempIndexDir, final File migratedIndexDir) throws IOException {
final EventIndexWriter writer = indexManager.borrowIndexWriter(tempIndexDir);
try {
final EventIterator eventIterator = eventStore.getEventsByTimestamp(minTimestamp, maxTimestamp);
final StopWatch stopWatch = new StopWatch(true);
Optional<ProvenanceEventRecord> optionalEvent;
while ((optionalEvent = eventIterator.nextEvent()).isPresent()) {
final ProvenanceEventRecord event = optionalEvent.get();
final Document document = eventConverter.convert(event, event.getEventId());
writer.index(document, Integer.MAX_VALUE);
successCount++;
}
writer.commit();
stopWatch.stop();
logger.info("Successfully indexed {} events to {} in {}", successCount, tempIndexDir, stopWatch.getDuration());
} finally {
indexManager.returnIndexWriter(writer, true, true);
}
Files.move(tempIndexDir.toPath(), migratedIndexDir.toPath());
}
}

View File

@ -17,16 +17,6 @@
package org.apache.nifi.provenance.index.lucene;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
@ -43,6 +33,16 @@ import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class QueryTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(QueryTask.class);
private static final Set<String> LUCENE_FIELDS_TO_LOAD = Collections.singleton(SearchableFields.Identifier.getSearchableFieldName());
@ -146,7 +146,7 @@ public class QueryTask implements Runnable {
return;
}
final Tuple<List<ProvenanceEventRecord>, Integer> eventsAndTotalHits = readDocuments(topDocs, indexReader);
final Tuple<List<ProvenanceEventRecord>, Long> eventsAndTotalHits = readDocuments(topDocs, indexReader);
if (eventsAndTotalHits == null) {
queryResult.update(Collections.emptyList(), 0L);
@ -168,10 +168,10 @@ public class QueryTask implements Runnable {
}
}
private Tuple<List<ProvenanceEventRecord>, Integer> readDocuments(final TopDocs topDocs, final IndexReader indexReader) {
private Tuple<List<ProvenanceEventRecord>, Long> readDocuments(final TopDocs topDocs, final IndexReader indexReader) {
// If no topDocs is supplied, just provide a Tuple that has no records and a hit count of 0.
if (topDocs == null || topDocs.totalHits == 0) {
return new Tuple<>(Collections.<ProvenanceEventRecord> emptyList(), 0);
if (topDocs == null || topDocs.totalHits.value == 0) {
return new Tuple<>(Collections.<ProvenanceEventRecord> emptyList(), 0L);
}
final long start = System.nanoTime();
@ -201,7 +201,7 @@ public class QueryTask implements Runnable {
final long fetchEventNanos = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - endConvert);
logger.debug("Fetching {} events from Event Store took {} ms ({} events actually fetched)", eventIds.size(), fetchEventNanos, events.size());
final int totalHits = topDocs.totalHits;
final long totalHits = topDocs.totalHits.value;
return new Tuple<>(events, totalHits);
}

View File

@ -1,574 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CachingIndexManager implements Closeable, IndexManager {
private static final Logger logger = LoggerFactory.getLogger(CachingIndexManager.class);
private final Lock lock = new ReentrantLock();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
@Override
public boolean removeIndex(final File indexDirectory) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.info("Removing index {}", indexDirectory);
lock.lock();
try {
final IndexWriterCount count = writerCounts.remove(absoluteFile);
if (count != null) {
try {
count.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
return false;
}
}
final List<ActiveIndexSearcher> searcherList = activeSearchers.remove(absoluteFile);
if (searcherList != null) {
for (final ActiveIndexSearcher searcher : searcherList) {
try {
searcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher {} for {} due to {}",
searcher.getSearcher(), absoluteFile, ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
return false;
}
}
}
} finally {
lock.unlock();
}
return true;
}
@Override
public EventIndexWriter borrowIndexWriter(final File indexDirectory) throws IOException {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.trace("Borrowing index writer for {}", indexDirectory);
lock.lock();
try {
IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if (writerCount == null) {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexDirectory);
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
final IndexWriter indexWriter = new IndexWriter(directory, config);
final EventIndexWriter eventIndexWriter = new LuceneEventIndexWriter(indexWriter, indexDirectory);
writerCount = new IndexWriterCount(eventIndexWriter, analyzer, directory, 1);
logger.debug("Providing new index writer for {}", indexDirectory);
} catch (final IOException ioe) {
for (final Closeable closeable : closeables) {
try {
closeable.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
writerCounts.put(absoluteFile, writerCount);
// Mark any active searchers as poisoned because we are updating the index
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
if (searchers != null) {
for (final ActiveIndexSearcher activeSearcher : searchers) {
logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexDirectory);
activeSearcher.poison();
}
}
} else {
logger.debug("Providing existing index writer for {} and incrementing count to {}", indexDirectory, writerCount.getCount() + 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
}
return writerCount.getWriter();
} finally {
lock.unlock();
}
}
@Override
public void returnIndexWriter(final EventIndexWriter writer) {
returnIndexWriter(writer, true, true);
}
@Override
public void returnIndexWriter(final EventIndexWriter writer, final boolean commit, final boolean isCloseable) {
final File indexDirectory = writer.getDirectory();
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.trace("Returning Index Writer for {} to IndexManager", indexDirectory);
lock.lock();
try {
final IndexWriterCount count = writerCounts.get(absoluteFile);
try {
if (count == null) {
logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ "This could potentially lead to a resource leak", writer, indexDirectory);
writer.close();
} else if (count.getCount() <= 1) {
// we are finished with this writer.
logger.info("Decrementing count for Index Writer for {} to {}. Now finished writing to this Index Directory",
indexDirectory, count.getCount() - 1);
try {
if (commit) {
writer.commit();
}
} finally {
if (isCloseable) {
try {
count.close();
} finally {
writerCounts.remove(absoluteFile);
}
}
}
} else {
// decrement the count.
logger.debug("Decrementing count for Index Writer for {} to {}", indexDirectory, count.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
} finally {
lock.unlock();
}
}
@Override
public EventIndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
final File absoluteFile = indexDir.getAbsoluteFile();
logger.trace("Borrowing index searcher for {}", indexDir);
lock.lock();
try {
// check if we already have a reader cached.
List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if (currentlyCached == null) {
currentlyCached = new ArrayList<>();
activeSearchers.put(absoluteFile, currentlyCached);
} else {
// keep track of any searchers that have been closed so that we can remove them
// from our cache later.
for (final ActiveIndexSearcher searcher : currentlyCached) {
if (searcher.isCache()) {
// if the searcher is poisoned, we want to close and expire it.
if (searcher.isPoisoned()) {
continue;
}
// if there are no references to the reader, it will have been closed. Since there is no
// isClosed() method, this is how we determine whether it's been closed or not.
final int refCount = searcher.getSearcher().getIndexSearcher().getIndexReader().getRefCount();
if (refCount <= 0) {
// if refCount == 0, then the reader has been closed, so we cannot use the searcher
logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ "removing cached searcher", absoluteFile, refCount);
continue;
}
final int referenceCount = searcher.incrementReferenceCount();
logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
}
// We found no cached Index Readers. Create a new one. To do this, we need to check
// if we have an Index Writer, and if so create a Reader based on the Index Writer.
// This will provide us a 'near real time' index reader.
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if (writerCount == null) {
final Directory directory = FSDirectory.open(absoluteFile);
logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
try {
final DirectoryReader directoryReader = DirectoryReader.open(directory);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
final EventIndexSearcher eventIndexSearcher = new LuceneEventIndexSearcher(searcher, indexDir, directory, directoryReader);
// we want to cache the searcher that we create, since it's just a reader.
final ActiveIndexSearcher cached = new ActiveIndexSearcher(eventIndexSearcher, absoluteFile, directoryReader, directory, true);
currentlyCached.add(cached);
return cached.getSearcher();
} catch (final IOException e) {
logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
logger.error("", e);
try {
directory.close();
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
throw e;
}
} else {
logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ "counter to {}", indexDir, writerCount.getCount() + 1);
// increment the writer count to ensure that it's kept open.
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
// create a new Index Searcher from the writer so that we don't have an issue with trying
// to read from a directory that's locked. If we get the "no segments* file found" with
// Lucene, this indicates that an IndexWriter already has the directory open.
final EventIndexWriter writer = writerCount.getWriter();
final DirectoryReader directoryReader = DirectoryReader.open(writer.getIndexWriter(), false);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
final EventIndexSearcher eventIndexSearcher = new LuceneEventIndexSearcher(searcher, indexDir, null, directoryReader);
// we don't want to cache this searcher because it's based on a writer, so we want to get
// new values the next time that we search.
final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(eventIndexSearcher, absoluteFile, directoryReader, null, false);
currentlyCached.add(activeSearcher);
return activeSearcher.getSearcher();
}
} finally {
lock.unlock();
}
}
@Override
public void returnIndexSearcher(final EventIndexSearcher searcher) {
final File indexDirectory = searcher.getIndexDirectory();
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
lock.lock();
try {
// check if we already have a reader cached.
final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if (currentlyCached == null) {
logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ "result in a resource leak", indexDirectory);
return;
}
// Check if the given searcher is in our list. We use an Iterator to do this so that if we
// find it we can call remove() on the iterator if need be.
final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
boolean activeSearcherFound = false;
while (itr.hasNext()) {
final ActiveIndexSearcher activeSearcher = itr.next();
if (activeSearcher.getSearcher().equals(searcher)) {
activeSearcherFound = true;
if (activeSearcher.isCache()) {
// if the searcher is poisoned, close it and remove from "pool". Otherwise,
// just decrement the count. Note here that when we call close() it won't actually close
// the underlying directory reader unless there are no more references to it
if (activeSearcher.isPoisoned()) {
itr.remove();
try {
activeSearcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
return;
} else {
// the searcher is cached. Just leave it open.
final int refCount = activeSearcher.decrementReferenceCount();
logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
return;
}
} else {
// searcher is not cached. It was created from a writer, and we want
// the newest updates the next time that we get a searcher, so we will
// go ahead and close this one out.
itr.remove();
// decrement the writer count because we incremented it when creating the searcher
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if (writerCount != null) {
if (writerCount.getCount() <= 1) {
try {
logger.debug("Index searcher for {} is not cached. Writer count is "
+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
writerCount.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
} else {
logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(),
writerCount.getCount() - 1));
}
}
try {
logger.debug("Closing Index Searcher for {}", indexDirectory);
final boolean allReferencesClosed = activeSearcher.close();
if (!allReferencesClosed) {
currentlyCached.add(activeSearcher);
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
}
}
}
if (!activeSearcherFound) {
logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
+ "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
}
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
logger.debug("Closing Index Manager");
lock.lock();
try {
IOException ioe = null;
for (final IndexWriterCount count : writerCounts.values()) {
try {
count.close();
} catch (final IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
for (final ActiveIndexSearcher searcher : searcherList) {
try {
searcher.close();
} catch (final IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
}
if (ioe != null) {
throw ioe;
}
} finally {
lock.unlock();
}
}
private static void close(final Closeable... closeables) throws IOException {
IOException ioe = null;
for (final Closeable closeable : closeables) {
if (closeable == null) {
continue;
}
try {
closeable.close();
} catch (final IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}
}
private static class ActiveIndexSearcher {
private final EventIndexSearcher searcher;
private final DirectoryReader directoryReader;
private final File indexDirectory;
private final Directory directory;
private final boolean cache;
private final AtomicInteger referenceCount = new AtomicInteger(1);
private volatile boolean poisoned = false;
public ActiveIndexSearcher(final EventIndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
final Directory directory, final boolean cache) {
this.searcher = searcher;
this.directoryReader = directoryReader;
this.indexDirectory = indexDirectory;
this.directory = directory;
this.cache = cache;
}
public boolean isCache() {
return cache;
}
public EventIndexSearcher getSearcher() {
return searcher;
}
public boolean isPoisoned() {
return poisoned;
}
public void poison() {
this.poisoned = true;
}
public int incrementReferenceCount() {
return referenceCount.incrementAndGet();
}
public int decrementReferenceCount() {
return referenceCount.decrementAndGet();
}
public boolean close() throws IOException {
final int updatedRefCount = referenceCount.decrementAndGet();
if (updatedRefCount <= 0) {
logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
CachingIndexManager.close(directoryReader, directory);
return true;
} else {
logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
return false;
}
}
@Override
public String toString() {
return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
}
}
private static class IndexWriterCount implements Closeable {
private final EventIndexWriter writer;
private final Analyzer analyzer;
private final Directory directory;
private final int count;
public IndexWriterCount(final EventIndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
this.writer = writer;
this.analyzer = analyzer;
this.directory = directory;
this.count = count;
}
public Analyzer getAnalyzer() {
return analyzer;
}
public Directory getDirectory() {
return directory;
}
public EventIndexWriter getWriter() {
return writer;
}
public int getCount() {
return count;
}
@Override
public void close() throws IOException {
CachingIndexManager.close(writer, analyzer, directory);
}
}
}

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.provenance.lucene;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.nifi.provenance.IndexConfiguration;
@ -31,6 +27,10 @@ import org.apache.nifi.provenance.serialization.RecordReaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DeleteIndexAction implements ExpirationAction {
private static final Logger logger = LoggerFactory.getLogger(DeleteIndexAction.class);
@ -66,7 +66,7 @@ public class DeleteIndexAction implements ExpirationAction {
final IndexWriter indexWriter = writer.getIndexWriter();
indexWriter.deleteDocuments(term);
indexWriter.commit();
final int docsLeft = indexWriter.numDocs();
final int docsLeft = indexWriter.getDocStats().numDocs;
deleteDir = docsLeft <= 0;
logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
} finally {

View File

@ -16,16 +16,6 @@
*/
package org.apache.nifi.provenance.lucene;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.authorization.AccessDeniedException;
@ -38,6 +28,15 @@ import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class IndexSearch {
private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
private final PersistentProvenanceRepository repository;
@ -76,15 +75,6 @@ public class IndexSearch {
final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
final Set<ProvenanceEventRecord> matchingRecords;
// we need to set the start date because if we do not, the first index may still have events that have aged off from
// the repository, and we don't want those events to count toward the total number of matches.
if (provenanceQuery.getStartDate() == null || provenanceQuery.getStartDate().getTime() < firstEventTimestamp) {
provenanceQuery.setStartDate(new Date(firstEventTimestamp));
}
if (provenanceQuery.getEndDate() == null) {
provenanceQuery.setEndDate(new Date());
}
final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
final long start = System.nanoTime();
@ -102,7 +92,7 @@ public class IndexSearch {
logger.debug("Searching {} for {} took {} millis; opening searcher took {} millis", this, provenanceQuery,
TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
if (topDocs.totalHits == 0) {
if (topDocs.totalHits.value == 0) {
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
}
@ -137,7 +127,7 @@ public class IndexSearch {
final long readRecordsNanos = System.nanoTime() - finishSearch;
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
sqr.update(matchingRecords, topDocs.totalHits);
sqr.update(matchingRecords, topDocs.totalHits.value);
final long queryNanos = System.nanoTime() - startNanos;
logger.info("Successfully executed {} against Index {}; Search took {} milliseconds; Total Hits = {}",

View File

@ -16,16 +16,11 @@
*/
package org.apache.nifi.provenance.lucene;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -34,6 +29,12 @@ import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class IndexingAction {
private final Set<SearchableField> searchableEventFields;
private final Set<SearchableField> searchableAttributeFields;
@ -75,16 +76,20 @@ public class IndexingAction {
// Index the fields that we always index (unless there's nothing else to index at all)
if (!doc.getFields().isEmpty()) {
doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
doc.add(new LongPoint(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate()));
doc.add(new LongPoint(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime()));
doc.add(new LongPoint(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize()));
doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
if ( blockIndex == null ) {
doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
doc.add(new LongPoint(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset()));
doc.add(new StoredField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset()));
} else {
doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
doc.add(new IntPoint(FieldNames.BLOCK_INDEX, blockIndex));
doc.add(new StoredField(FieldNames.BLOCK_INDEX, blockIndex));
doc.add(new LongPoint(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId()));
doc.add(new StoredField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId()));
}
// If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.

View File

@ -16,7 +16,16 @@
*/
package org.apache.nifi.provenance.lucene;
import static java.util.Objects.requireNonNull;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
@ -26,16 +35,7 @@ import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
public class LineageQuery {
@ -63,11 +63,13 @@ public class LineageQuery {
if (flowFileUuids == null || flowFileUuids.isEmpty()) {
flowFileIdQuery = null;
} else {
flowFileIdQuery = new BooleanQuery();
final BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
for (final String flowFileUuid : flowFileUuids) {
flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD);
final TermQuery termQuery = new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid));
queryBuilder.add(new BooleanClause(termQuery, BooleanClause.Occur.SHOULD));
}
flowFileIdQuery.setMinimumNumberShouldMatch(1);
flowFileIdQuery = queryBuilder.build();
}
final long searchStart = System.nanoTime();

View File

@ -16,6 +16,21 @@
*/
package org.apache.nifi.provenance.lucene;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.WildcardQuery;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.search.SearchTerm;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
@ -26,47 +41,21 @@ import java.nio.charset.CodingErrorAction;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.util.Version;
public class LuceneUtil {
public static final Version LUCENE_VERSION = Version.LATEST;
private static final long[] MIN_LONG_ARRAY = new long[] { Long.MIN_VALUE };
private static final long[] MAX_LONG_ARRAY = new long[] { Long.MAX_VALUE };
public static String substringBefore(final String value, final String searchValue) {
final int index = value.indexOf(searchValue);
return (index < 0) ? value : value.substring(0, index);
}
public static String substringAfter(final String value, final String searchValue) {
final int index = value.indexOf(searchValue);
return (index < 0) ? value : (index > value.length() - 2) ? "" : value.substring(index + 1);
}
public static String substringBeforeLast(final String value, final String searchValue) {
final int index = value.lastIndexOf(searchValue);
return (index < 0) ? value : value.substring(0, index);
}
public static String substringAfterLast(final String value, final String searchValue) {
final int index = value.lastIndexOf(searchValue);
return (index < 0 || index >= value.length()) ? value : value.substring(index + 1);
@ -108,7 +97,7 @@ public class LuceneUtil {
return new MatchAllDocsQuery();
}
final BooleanQuery luceneQuery = new BooleanQuery();
final BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
for (final SearchTerm searchTerm : query.getSearchTerms()) {
final String searchValue = searchTerm.getValue();
if (searchValue == null) {
@ -116,25 +105,25 @@ public class LuceneUtil {
}
if (searchValue.contains("*") || searchValue.contains("?")) {
luceneQuery.add(new BooleanClause(new WildcardQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST));
queryBuilder.add(new BooleanClause(new WildcardQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST));
} else {
luceneQuery.add(new BooleanClause(new TermQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST));
queryBuilder.add(new BooleanClause(new TermQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST));
}
}
final Long minBytes = query.getMinFileSize() == null ? null : DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue();
final Long maxBytes = query.getMaxFileSize() == null ? null : DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue();
if (minBytes != null || maxBytes != null) {
luceneQuery.add(NumericRangeQuery.newLongRange(SearchableFields.FileSize.getSearchableFieldName(), minBytes, maxBytes, true, true), Occur.MUST);
if (query.getMinFileSize() != null || query.getMaxFileSize() != null) {
final long minBytes = query.getMinFileSize() == null ? 0L : DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue();
final long maxBytes = query.getMaxFileSize() == null ? Long.MAX_VALUE : DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue();
queryBuilder.add(LongPoint.newRangeQuery(SearchableFields.FileSize.getSearchableFieldName(), minBytes, maxBytes), Occur.MUST);
}
final Long minDateTime = query.getStartDate() == null ? null : query.getStartDate().getTime();
final Long maxDateTime = query.getEndDate() == null ? null : query.getEndDate().getTime();
if (maxDateTime != null || minDateTime != null) {
luceneQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), minDateTime, maxDateTime, true, true), Occur.MUST);
if (query.getStartDate() != null || query.getEndDate() != null) {
final long minDateTime = query.getStartDate() == null ? 0L : query.getStartDate().getTime();
final long maxDateTime = query.getEndDate() == null ? Long.MAX_VALUE : query.getEndDate().getTime();
queryBuilder.add(LongPoint.newRangeQuery(SearchableFields.EventTime.getSearchableFieldName(), minDateTime, maxDateTime), Occur.MUST);
}
return luceneQuery;
return queryBuilder.build();
}
/**
@ -145,7 +134,7 @@ public class LuceneUtil {
* list of {@link Document}s
*/
public static void sortDocsForRetrieval(final List<Document> documents) {
Collections.sort(documents, new Comparator<Document>() {
documents.sort(new Comparator<Document>() {
@Override
public int compare(final Document o1, final Document o2) {
final String filename1 = o1.get(FieldNames.STORAGE_FILENAME);
@ -158,9 +147,9 @@ public class LuceneUtil {
final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
if ( fileOffset1 != null && fileOffset2 != null ) {
if (fileOffset1 != null && fileOffset2 != null) {
final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
if ( blockIndexResult != 0 ) {
if (blockIndexResult != 0) {
return blockIndexResult;
}
@ -192,7 +181,7 @@ public class LuceneUtil {
for (Document document : documents) {
String fileName = document.get(FieldNames.STORAGE_FILENAME);
if (!documentGroups.containsKey(fileName)) {
documentGroups.put(fileName, new ArrayList<Document>());
documentGroups.put(fileName, new ArrayList<>());
}
documentGroups.get(fileName).add(document);
}

View File

@ -46,14 +46,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimpleIndexManager implements IndexManager {
private static final Logger logger = LoggerFactory.getLogger(SimpleIndexManager.class);
public class StandardIndexManager implements IndexManager {
private static final Logger logger = LoggerFactory.getLogger(StandardIndexManager.class);
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); // guarded by synchronizing on map itself
private final ExecutorService searchExecutor;
private final RepositoryConfiguration repoConfig;
public SimpleIndexManager(final RepositoryConfiguration repoConfig) {
public StandardIndexManager(final RepositoryConfiguration repoConfig) {
this.repoConfig = repoConfig;
this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index", true));
}
@ -105,11 +105,11 @@ public class SimpleIndexManager implements IndexManager {
final DirectoryReader directoryReader;
if (writerCount == null) {
logger.trace("Creating index searcher for {}", indexDir);
final Directory directory = FSDirectory.open(indexDir);
final Directory directory = FSDirectory.open(indexDir.toPath());
directoryReader = DirectoryReader.open(directory);
} else {
final EventIndexWriter eventIndexWriter = writerCount.getWriter();
directoryReader = DirectoryReader.open(eventIndexWriter.getIndexWriter(), false);
directoryReader = DirectoryReader.open(eventIndexWriter.getIndexWriter(), false, false);
}
final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor);
@ -197,14 +197,14 @@ public class SimpleIndexManager implements IndexManager {
private IndexWriterCount createWriter(final File indexDirectory) throws IOException {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexDirectory);
final Directory directory = FSDirectory.open(indexDirectory.toPath());
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
final IndexWriterConfig config = new IndexWriterConfig(analyzer);
final ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
final int mergeThreads = repoConfig.getConcurrentMergeThreads();
@ -235,7 +235,7 @@ public class SimpleIndexManager implements IndexManager {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.trace("Borrowing index writer for {}", indexDirectory);
IndexWriterCount writerCount = null;
IndexWriterCount writerCount;
synchronized (writerCounts) {
writerCount = writerCounts.get(absoluteFile);
@ -269,7 +269,7 @@ public class SimpleIndexManager implements IndexManager {
logger.trace("Returning Index Writer for {} to IndexManager", indexDirectory);
boolean unused = false;
IndexWriterCount count = null;
IndexWriterCount count;
boolean close = isCloseable;
try {
synchronized (writerCounts) {

View File

@ -17,15 +17,6 @@
package org.apache.nifi.provenance.serialization;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.store.EventFileManager;
import org.apache.nifi.provenance.toc.StandardTocReader;
import org.apache.nifi.provenance.toc.StandardTocWriter;
@ -41,6 +32,17 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* <p>
* This class is responsible for compressing Event Files as a background task. This is done as a background task instead of being
@ -79,9 +81,9 @@ public class EventFileCompressor implements Runnable {
continue;
}
File outputFile = null;
long bytesBefore = 0L;
StandardTocReader tocReader = null;
File outputFile;
long bytesBefore;
StandardTocReader tocReader;
File tmpTocFile = null;
eventFileManager.obtainReadLock(uncompressedEventFile);
@ -91,6 +93,13 @@ public class EventFileCompressor implements Runnable {
final File tocFile = TocUtil.getTocFile(uncompressedEventFile);
try {
tocReader = new StandardTocReader(tocFile);
} catch (final FileNotFoundException fnfe) {
logger.debug("Attempted to compress event file {} but the TOC file {} could not be found", uncompressedEventFile, tocFile);
continue;
} catch (final EOFException eof) {
logger.info("Attempted to compress event file {} but encountered unexpected End-of-File when reading TOC file {}; this typically happens as a result of the data aging off " +
"from the Provenance Repository before it is able to be compressed.", uncompressedEventFile, tocFile);
continue;
} catch (final IOException e) {
logger.error("Failed to read TOC File {}", tocFile, e);
continue;
@ -151,7 +160,7 @@ public class EventFileCompressor implements Runnable {
}
}
public static void compress(final File input, final TocReader tocReader, final File output, final TocWriter tocWriter) throws IOException {
private static void compress(final File input, final TocReader tocReader, final File output, final TocWriter tocWriter) throws IOException {
try (final InputStream fis = new FileInputStream(input);
final OutputStream fos = new FileOutputStream(output);
final ByteCountingOutputStream byteCountingOut = new ByteCountingOutputStream(fos)) {

View File

@ -17,16 +17,17 @@
package org.apache.nifi.provenance.store;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
/**
* <p>
@ -120,4 +121,14 @@ public interface EventStore extends Closeable {
* @param eventIndex the EventIndex to use for indexing events
*/
void reindexLatestEvents(EventIndex eventIndex);
/**
* Returns an EventIterator that can be used to iterate over all events whose timestamp fall between the given time range.
* @param minTimestamp the minimum timestamp
* @param maxTimestamp the maximum timestamp
*
* @return an EventIterator that includes the events in the given time window
* @throws IOException if unable to retrieve records from the store
*/
EventIterator getEventsByTimestamp(long minTimestamp, long maxTimestamp) throws IOException;
}

View File

@ -17,6 +17,19 @@
package org.apache.nifi.provenance.store;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.store.iterator.AuthorizingEventIterator;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -31,19 +44,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.store.iterator.AuthorizingEventIterator;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class PartitionedEventStore implements EventStore {
private static final Logger logger = LoggerFactory.getLogger(PartitionedEventStore.class);
private static final String EVENT_CATEGORY = "Provenance Repository";
@ -126,7 +126,7 @@ public abstract class PartitionedEventStore implements EventStore {
@Override
public long getMaxEventId() {
return getPartitions().stream()
.mapToLong(part -> part.getMaxEventId())
.mapToLong(EventStorePartition::getMaxEventId)
.max()
.orElse(-1L);
}

View File

@ -17,6 +17,14 @@
package org.apache.nifi.provenance.store;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.EventFileCompressor;
import org.apache.nifi.provenance.store.iterator.AggregateEventIterator;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -31,12 +39,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.EventFileCompressor;
public class PartitionedWriteAheadEventStore extends PartitionedEventStore {
private final BlockingQueue<File> filesToCompress;
private final List<WriteAheadStorePartition> partitions;
@ -54,8 +56,8 @@ public class PartitionedWriteAheadEventStore extends PartitionedEventStore {
this.eventReporter = eventReporter;
this.filesToCompress = new LinkedBlockingQueue<>(100);
final AtomicLong idGenerator = new AtomicLong(0L);
this.partitions = createPartitions(repoConfig, recordWriterFactory, recordReaderFactory, idGenerator);
this.fileManager = fileManager;
this.partitions = createPartitions(repoConfig, recordWriterFactory, recordReaderFactory, idGenerator);
// Creates tasks to compress data on rollover
if (repoConfig.isCompressOnRollover()) {
@ -78,7 +80,7 @@ public class PartitionedWriteAheadEventStore extends PartitionedEventStore {
final String partitionName = entry.getKey();
final File storageDirectory = entry.getValue();
partitions.add(new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig,
recordWriterFactory, recordReaderFactory, filesToCompress, idGenerator, eventReporter));
recordWriterFactory, recordReaderFactory, filesToCompress, idGenerator, eventReporter, fileManager));
}
return partitions;
@ -139,4 +141,16 @@ public class PartitionedWriteAheadEventStore extends PartitionedEventStore {
protected List<WriteAheadStorePartition> getPartitions() {
return partitions;
}
@Override
public EventIterator getEventsByTimestamp(final long minTimestamp, final long maxTimestamp) throws IOException {
final List<EventIterator> eventIterators = new ArrayList<>();
for (final WriteAheadStorePartition partition : getPartitions()) {
final EventIterator partitionEventIterator = partition.getEventsByTimestamp(minTimestamp, maxTimestamp);
eventIterators.add(partitionEventIterator);
}
return new AggregateEventIterator(eventIterators);
}
}

View File

@ -31,10 +31,6 @@ public class RecordWriterLease {
private boolean markedRollable = false;
private boolean closed = false;
public RecordWriterLease(final RecordWriter writer, final long maxBytes) {
this(writer, maxBytes, Integer.MAX_VALUE);
}
public RecordWriterLease(final RecordWriter writer, final long maxBytes, final int maxEvents) {
this.writer = writer;
this.maxBytes = maxBytes;

View File

@ -17,8 +17,28 @@
package org.apache.nifi.provenance.store;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.store.iterator.SelectiveRecordReaderEventIterator;
import org.apache.nifi.provenance.store.iterator.SequentialRecordReaderEventIterator;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.provenance.util.NamedThreadFactory;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
@ -42,25 +62,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.store.iterator.SelectiveRecordReaderEventIterator;
import org.apache.nifi.provenance.store.iterator.SequentialRecordReaderEventIterator;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.provenance.util.NamedThreadFactory;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WriteAheadStorePartition implements EventStorePartition {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class);
@ -72,6 +73,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
private final BlockingQueue<File> filesToCompress;
private final AtomicLong idGenerator;
private final AtomicLong maxEventId = new AtomicLong(-1L);
private final EventFileManager eventFileManager;
private volatile boolean closed = false;
private AtomicReference<RecordWriterLease> eventWriterLeaseRef = new AtomicReference<>();
@ -79,7 +81,8 @@ public class WriteAheadStorePartition implements EventStorePartition {
private final SortedMap<Long, File> minEventIdToPathMap = new TreeMap<>(); // guarded by synchronizing on object
public WriteAheadStorePartition(final File storageDirectory, final String partitionName, final RepositoryConfiguration repoConfig, final RecordWriterFactory recordWriterFactory,
final RecordReaderFactory recordReaderFactory, final BlockingQueue<File> filesToCompress, final AtomicLong idGenerator, final EventReporter eventReporter) {
final RecordReaderFactory recordReaderFactory, final BlockingQueue<File> filesToCompress, final AtomicLong idGenerator, final EventReporter eventReporter,
final EventFileManager eventFileManager) {
this.partitionName = partitionName;
this.config = repoConfig;
@ -88,6 +91,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
this.recordWriterFactory = recordWriterFactory;
this.recordReaderFactory = recordReaderFactory;
this.filesToCompress = filesToCompress;
this.eventFileManager = eventFileManager;
}
@Override
@ -118,7 +122,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
// the Largest Event ID to the smallest.
long maxEventId = -1L;
final List<File> fileList = Arrays.asList(files);
Collections.sort(fileList, DirectoryUtils.LARGEST_ID_FIRST);
fileList.sort(DirectoryUtils.LARGEST_ID_FIRST);
for (final File file : fileList) {
try {
final RecordReader reader = recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE);
@ -336,7 +340,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
@Override
public long getSize() {
return getEventFilesFromDisk()
.collect(Collectors.summarizingLong(file -> file.length()))
.collect(Collectors.summarizingLong(File::length))
.getSum();
}
@ -471,7 +475,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
.sorted(DirectoryUtils.SMALLEST_ID_FIRST)
.forEach(file -> delete(file));
.forEach(this::delete);
}
@ -482,7 +486,13 @@ public class WriteAheadStorePartition implements EventStorePartition {
return 0L;
}
final RecordWriterLease lease = eventWriterLeaseRef.get();
final File currentFile = lease == null ? null : lease.getWriter().getFile();
for (final File eventFile : eventFiles) {
if (eventFile.equals(currentFile)) {
continue;
}
final long fileSize = eventFile.length();
if (delete(eventFile)) {
@ -503,17 +513,22 @@ public class WriteAheadStorePartition implements EventStorePartition {
minEventIdToPathMap.remove(firstEventId);
}
if (!file.delete()) {
logger.warn("Failed to remove Provenance Event file {}; this file should be cleaned up manually", file);
return false;
}
eventFileManager.obtainWriteLock(file);
try {
if (!file.delete()) {
logger.warn("Failed to remove Provenance Event file {}; this file should be cleaned up manually", file);
return false;
}
final File tocFile = TocUtil.getTocFile(file);
if (tocFile.exists() && !tocFile.delete()) {
logger.warn("Failed to remove Provenance Table-of-Contents file {}; this file should be cleaned up manually", tocFile);
}
final File tocFile = TocUtil.getTocFile(file);
if (tocFile.exists() && !tocFile.delete()) {
logger.warn("Failed to remove Provenance Table-of-Contents file {}; this file should be cleaned up manually", tocFile);
}
return true;
return true;
} finally {
eventFileManager.releaseWriteLock(file);
}
}
void reindexLatestEvents(final EventIndex eventIndex) {
@ -575,7 +590,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
}
}
StandardProvenanceEventRecord event = null;
StandardProvenanceEventRecord event;
while (true) {
final long startBytesConsumed = recordReader.getBytesConsumed();
@ -596,7 +611,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
}
}
}
} catch (final EOFException eof) {
} catch (final EOFException | FileNotFoundException eof) {
// Ran out of data. Continue on.
logger.warn("Failed to find event with ID {} in Event File {} due to {}", minEventIdToReindex, eventFile, eof.toString());
} catch (final Exception e) {
@ -635,6 +650,50 @@ public class WriteAheadStorePartition implements EventStorePartition {
reindexedCount.get(), eventFilesToReindex.size(), partitionDirectory, seconds, millisRemainder);
}
EventIterator getEventsByTimestamp(final long minTimestmap, final long maxTimestamp) throws IOException {
// Get a list of all Files and order them based on their ID such that the largest ID is first.
// This allows us to step through the event files in order and read the first event in the file.
// If the first event comes after out maxTimestamp, then we know that all other events do as well,
// so we can ignore that file. Otherwise, we must add it to our list of Files that may contain events
// within the given time range. If we then reach a file whose first event comes before our minTimestamp,
// this means that all other files that we later encounter will have a max timestamp that comes before
// our earliest event time, so we can stop adding files at that point.
final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.LARGEST_ID_FIRST).collect(Collectors.toList());
if (eventFiles.isEmpty()) {
return EventIterator.EMPTY;
}
final List<File> relevantEventFiles = new ArrayList<>();
for (final File eventFile : eventFiles) {
final ProvenanceEventRecord firstEvent = getFirstEvent(eventFile);
if (firstEvent == null) {
return EventIterator.EMPTY;
}
final long eventTime = firstEvent.getEventTime();
if (eventTime > maxTimestamp) {
continue;
}
relevantEventFiles.add(eventFile);
if (eventTime < minTimestmap) {
break;
}
}
final EventIterator rawEventIterator = new SequentialRecordReaderEventIterator(relevantEventFiles, recordReaderFactory, 0, Integer.MAX_VALUE);
return rawEventIterator.filter(event -> event.getEventTime() >= minTimestmap && event.getEventTime() <= maxTimestamp);
}
private ProvenanceEventRecord getFirstEvent(final File eventFile) throws IOException {
try (final RecordReader recordReader = recordReaderFactory.newRecordReader(eventFile, Collections.emptyList(), Integer.MAX_VALUE)) {
return recordReader.nextRecord();
}
}
@Override
public String toString() {
return "Provenance Event Store Partition[directory=" + partitionDirectory + "]";

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.store.iterator;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class AggregateEventIterator implements EventIterator {
private final List<EventIterator> iteratorList;
private final Iterator<EventIterator> iterators;
private EventIterator currentIterator;
public AggregateEventIterator(final List<EventIterator> eventIterators) {
iteratorList = eventIterators;
this.iterators = eventIterators.iterator();
if (iterators.hasNext()) {
currentIterator = iterators.next();
}
}
@Override
public Optional<ProvenanceEventRecord> nextEvent() throws IOException {
while (true) {
final Optional<ProvenanceEventRecord> optionalEvent = currentIterator.nextEvent();
if (optionalEvent.isPresent()) {
return optionalEvent;
}
if (iterators.hasNext()) {
currentIterator = iterators.next();
} else {
return Optional.empty();
}
}
}
@Override
public void close() throws IOException {
for (final EventIterator iterator : iteratorList) {
iterator.close();
}
}
}

View File

@ -17,18 +17,20 @@
package org.apache.nifi.provenance.store.iterator;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import java.util.function.Predicate;
public interface EventIterator extends Closeable {
Optional<ProvenanceEventRecord> nextEvent() throws IOException;
public static EventIterator EMPTY = new EventIterator() {
@Override
public void close() throws IOException {
@ -53,4 +55,31 @@ public interface EventIterator extends Closeable {
}
};
}
default EventIterator filter(Predicate<ProvenanceEventRecord> predicate) {
final EventIterator self = this;
return new EventIterator() {
@Override
public void close() throws IOException {
self.close();
}
@Override
public Optional<ProvenanceEventRecord> nextEvent() throws IOException {
while (true) {
Optional<ProvenanceEventRecord> next = self.nextEvent();
if (!next.isPresent()) {
return next;
}
final ProvenanceEventRecord event = next.get();
if (predicate.test(event)) {
return next;
}
}
}
};
}
}

View File

@ -16,13 +16,14 @@
*/
package org.apache.nifi.provenance.toc;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.nifi.stream.io.StreamUtils;
/**
* Standard implementation of TocReader.
*
@ -45,7 +46,11 @@ public class StandardTocReader implements TocReader {
this.file = file;
final long fileLength = file.length();
if (fileLength < 2) {
throw new EOFException();
if (file.exists()) {
throw new EOFException();
} else {
throw new FileNotFoundException();
}
}
try (final FileInputStream fis = new FileInputStream(file)) {

View File

@ -19,33 +19,19 @@ package org.apache.nifi.provenance.util;
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.provenance.RepositoryConfiguration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DirectoryUtils {
public static final Pattern INDEX_DIRECTORY_NAME_PATTERN = Pattern.compile("(?:lucene-\\d+-)?index-(.*)");
public static final FileFilter INDEX_FILE_FILTER = f -> INDEX_DIRECTORY_NAME_PATTERN.matcher(f.getName()).matches();
public static final FileFilter EVENT_FILE_FILTER = f -> f.getName().endsWith(".prov") || f.getName().endsWith(".prov.gz");
public static final FileFilter INDEX_FILE_FILTER = f -> f.getName().startsWith("index-");
public static final Comparator<File> SMALLEST_ID_FIRST = (a, b) -> Long.compare(getMinId(a), getMinId(b));
public static final Comparator<File> LARGEST_ID_FIRST = SMALLEST_ID_FIRST.reversed();
public static final Comparator<File> OLDEST_INDEX_FIRST = (a, b) -> Long.compare(getIndexTimestamp(a), getIndexTimestamp(b));
public static final Comparator<File> NEWEST_INDEX_FIRST = OLDEST_INDEX_FIRST.reversed();
public static List<Path> getProvenanceEventFiles(final RepositoryConfiguration repoConfig) {
return repoConfig.getStorageDirectories().values().stream()
.flatMap(f -> {
final File[] eventFiles = f.listFiles(EVENT_FILE_FILTER);
return eventFiles == null ? Stream.empty() : Arrays.stream(eventFiles);
})
.map(f -> f.toPath())
.collect(Collectors.toList());
}
public static long getMinId(final File file) {
final String filename = file.getName();
@ -64,13 +50,13 @@ public class DirectoryUtils {
public static long getIndexTimestamp(final File file) {
final String filename = file.getName();
if (!filename.startsWith("index-") && filename.length() > 6) {
final Matcher matcher = INDEX_DIRECTORY_NAME_PATTERN.matcher(filename);
if (!matcher.matches()) {
return -1L;
}
final String suffix = filename.substring(6);
try {
return Long.parseLong(suffix);
return Long.parseLong(matcher.group(1));
} catch (final NumberFormatException nfe) {
return -1L;
}

View File

@ -31,15 +31,11 @@ import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
import org.apache.nifi.provenance.lucene.CachingIndexManager;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
@ -63,8 +59,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
@ -84,7 +78,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -649,165 +642,6 @@ public class ITestPersistentProvenanceRepository {
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
}
@Test(timeout = 10000)
public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration config = createConfiguration();
config.setMaxRecordLife(30, TimeUnit.SECONDS);
config.setMaxStorageCapacity(1024L * 1024L * 10);
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
config.setMaxEventFileCapacity(1024L * 1024L * 10);
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
private CachingIndexManager wrappedManager = null;
// Create an IndexManager that adds a delay before returning the Index Searcher.
@Override
protected synchronized CachingIndexManager getIndexManager() {
if (wrappedManager == null) {
final IndexManager mgr = super.getIndexManager();
final Logger logger = LoggerFactory.getLogger("IndexManager");
wrappedManager = new CachingIndexManager() {
final AtomicInteger indexSearcherCount = new AtomicInteger(0);
@Override
public EventIndexSearcher borrowIndexSearcher(File indexDir) throws IOException {
final EventIndexSearcher searcher = mgr.borrowIndexSearcher(indexDir);
final int idx = indexSearcherCount.incrementAndGet();
obtainIndexSearcherLatch.countDown();
// The first searcher should sleep for 3 seconds. The second searcher should
// sleep for 5 seconds. This allows us to have two threads each obtain a Searcher
// and then have one of them finish searching and close the searcher if it's poisoned while the
// second thread is still holding the searcher
try {
if (idx == 1) {
Thread.sleep(3000L);
} else {
Thread.sleep(5000L);
}
} catch (InterruptedException e) {
throw new IOException("Interrupted", e);
}
logger.info("Releasing index searcher");
return searcher;
}
@Override
public EventIndexWriter borrowIndexWriter(File indexingDirectory) throws IOException {
return mgr.borrowIndexWriter(indexingDirectory);
}
@Override
public void close() throws IOException {
mgr.close();
}
@Override
public boolean removeIndex(File indexDirectory) {
mgr.removeIndex(indexDirectory);
return true;
}
@Override
public void returnIndexSearcher(EventIndexSearcher searcher) {
mgr.returnIndexSearcher(searcher);
}
@Override
public void returnIndexWriter(EventIndexWriter writer) {
mgr.returnIndexWriter(writer);
}
};
}
return wrappedManager;
}
};
repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
for (int i = 0; i < 10; i++) {
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
repo.registerEvent(builder.build());
}
repo.waitForRollover();
// Perform a query. This will ensure that an IndexSearcher is created and cached.
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.setMaxResults(100);
// Run a query in a background thread. When this thread goes to obtain the IndexSearcher, it will have a 5 second delay.
// That delay will occur as the main thread is updating the index. This should result in the search creating a new Index Reader
// that can properly query the index.
final int numThreads = 2;
final CountDownLatch performSearchLatch = new CountDownLatch(numThreads);
final Runnable searchRunnable = new Runnable() {
@Override
public void run() {
QueryResult result;
try {
result = repo.queryEvents(query, createUser());
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.toString());
return;
}
System.out.println("Finished search: " + result);
performSearchLatch.countDown();
}
};
// Kick off the searcher threads
for (int i = 0; i < numThreads; i++) {
final Thread searchThread = new Thread(searchRunnable);
searchThread.start();
}
// Wait until we've obtained the Index Searchers before modifying the index.
obtainIndexSearcherLatch.await();
// add more events to the repo
for (int i = 0; i < 10; i++) {
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
repo.registerEvent(builder.build());
}
// Force a rollover to occur. This will modify the index.
repo.rolloverWithLock(true);
// Wait for the repository to roll over.
repo.waitForRollover();
// Wait for the searches to complete.
performSearchLatch.await();
}
@Test
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
@ -1773,7 +1607,7 @@ public class ITestPersistentProvenanceRepository {
private List<Document> runQuery(final File indexDirectory, final List<File> storageDirs, final String query) throws IOException, ParseException {
assumeFalse(isWindowsEnvironment());
try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) {
try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory.toPath()))) {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
final Analyzer analyzer = new SimpleAnalyzer();

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.events.EventReporter;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* With NiFi 1.10.0 (?) we changed from Lucene 4.x to Lucene 8.x
* This test is intended to ensure that we can properly startup even when pointing to a Provenance
* Repository that was created against the old Lucene.
*/
public class StartupAgainstOldLuceneIndexIT {
@Test(timeout = 30000)
public void testStartup() throws IOException, InterruptedException {
// Test startup with old lucene 4 index directory and no temp or migrated directory.
testStartup(false, false);
// Test startup with old lucene 4 index directory and temp directory.
testStartup(true, false);
// Test startup with old lucene 4 index directory, temp directory, and final migrated directory.
testStartup(true, true);
}
private void testStartup(final boolean createTempDirectory, final boolean createMigratedDirectory) throws IOException, InterruptedException {
final File existingRepo = new File("src/test/resources/lucene-4-prov-repo");
final File tempDir = new File("target/" + UUID.randomUUID().toString());
copy(existingRepo, tempDir);
final File oldIndexDir = new File(tempDir, "index-1554304717707");
assertTrue(oldIndexDir.exists());
if (createTempDirectory) {
final File tempIndexDir = new File(tempDir, "temp-lucene-8-index-1554304717707");
assertTrue(tempIndexDir.mkdirs());
final File dummyFile = new File(tempIndexDir, "_0.fdt");
try (final OutputStream fos = new FileOutputStream(dummyFile)) {
fos.write("hello world".getBytes());
}
}
if (createMigratedDirectory) {
final File migratedDirectory = new File(tempDir, "lucene-8-index-1554304717707");
assertTrue(migratedDirectory.mkdirs());
final File dummyFile = new File(migratedDirectory, "_0.fdt");
try (final OutputStream fos = new FileOutputStream(dummyFile)) {
fos.write("hello world".getBytes());
}
}
final RepositoryConfiguration repoConfig = new RepositoryConfiguration();
repoConfig.addStorageDirectory("1", tempDir);
repoConfig.setSearchableFields(Arrays.asList(SearchableFields.FlowFileUUID, SearchableFields.Filename, SearchableFields.EventTime, SearchableFields.EventType));
final WriteAheadProvenanceRepository writeAheadRepo = new WriteAheadProvenanceRepository(repoConfig);
final Authorizer authorizer = Mockito.mock(Authorizer.class);
writeAheadRepo.initialize(EventReporter.NO_OP, authorizer, Mockito.mock(ProvenanceAuthorizableFactory.class), Mockito.mock(IdentifierLookup.class));
final ProvenanceEventRecord event = TestUtil.createEvent();
writeAheadRepo.registerEvents(Collections.singleton(event));
while (oldIndexDir.exists()) {
Thread.sleep(5L);
}
assertFalse(oldIndexDir.exists());
final File newIndexDir = new File(tempDir, "lucene-8-index-1554304717707");
while (!newIndexDir.exists()) {
Thread.sleep(5L);
}
assertTrue(newIndexDir.exists());
}
private void copy(final File from, final File to) throws IOException {
if (from.isFile()) {
Files.copy(from.toPath(), to.toPath());
return;
}
to.mkdirs();
final File[] children = from.listFiles();
for (final File child : children) {
final File destination = new File(to, child.getName());
copy(child, destination);
}
}
}

View File

@ -17,17 +17,8 @@
package org.apache.nifi.provenance.index.lucene;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexWriter;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
@ -40,6 +31,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public class TestEventIndexTask {
@BeforeClass
@ -67,9 +66,9 @@ public class TestEventIndexTask {
// Create an EventIndexTask and override the commit(IndexWriter) method so that we can keep track of how
// many times the index writer gets committed.
final EventIndexTask task = new EventIndexTask(docQueue, repoConfig, indexManager, directoryManager, 201, EventReporter.NO_OP) {
final EventIndexTask task = new EventIndexTask(docQueue, indexManager, directoryManager, 201, EventReporter.NO_OP) {
@Override
protected void commit(EventIndexWriter indexWriter) throws IOException {
protected void commit(EventIndexWriter indexWriter) {
commitCount.incrementAndGet();
}
};
@ -86,7 +85,7 @@ public class TestEventIndexTask {
// Index 100 documents with a storage filename of "0.0.prov"
for (int i = 0; i < 100; i++) {
final Document document = new Document();
document.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis(), Store.NO));
document.add(new LongPoint(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis()));
final StorageSummary location = new StorageSummary(1L, "0.0.prov", "1", 0, 1000L, 1000L);
final StoredDocument storedDoc = new StoredDocument(document, location);
@ -97,7 +96,7 @@ public class TestEventIndexTask {
// Index 100 documents
for (int i = 0; i < 100; i++) {
final Document document = new Document();
document.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis(), Store.NO));
document.add(new LongPoint(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis()));
final StorageSummary location = new StorageSummary(1L, "0.0.prov", "1", 0, 1000L, 1000L);
final StoredDocument storedDoc = new StoredDocument(document, location);
@ -115,7 +114,7 @@ public class TestEventIndexTask {
// Add another document.
final Document document = new Document();
document.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis(), Store.NO));
document.add(new LongPoint(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis()));
final StorageSummary location = new StorageSummary(1L, "0.0.prov", "1", 0, 1000L, 1000L);
StoredDocument storedDoc = new StoredDocument(document, location);

View File

@ -17,8 +17,8 @@
package org.apache.nifi.provenance.index.lucene;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
@ -28,8 +28,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestIndexDirectoryManager {
@ -42,8 +42,8 @@ public class TestIndexDirectoryManager {
final List<File> directories = IndexDirectoryManager.getDirectories(1000L, 1001L, locations);
assertEquals(2, directories.size());
assertTrue(directories.contains(new File("index-999")));
assertTrue(directories.contains(new File("index-1002")));
assertTrue(directories.contains(new File("lucene-8-index-999")));
assertTrue(directories.contains(new File("lucene-8-index-1002")));
}
@Test
@ -53,10 +53,10 @@ public class TestIndexDirectoryManager {
final File storageDir1 = config.getStorageDirectories().get("1");
final File storageDir2 = config.getStorageDirectories().get("2");
final File index1 = new File(storageDir1, "index-1");
final File index2 = new File(storageDir1, "index-2");
final File index3 = new File(storageDir2, "index-3");
final File index4 = new File(storageDir2, "index-4");
final File index1 = new File(storageDir1, "lucene-8-index-1");
final File index2 = new File(storageDir1, "lucene-8-index-2");
final File index3 = new File(storageDir2, "lucene-8-index-3");
final File index4 = new File(storageDir2, "lucene-8-index-4");
final File[] allIndices = new File[] {index1, index2, index3, index4};
for (final File file : allIndices) {
@ -86,17 +86,17 @@ public class TestIndexDirectoryManager {
@Test
public void testActiveIndexNotLostWhenSizeExceeded() throws IOException, InterruptedException {
public void testActiveIndexNotLostWhenSizeExceeded() throws IOException {
final RepositoryConfiguration config = createConfig(2);
config.setDesiredIndexSize(4096 * 128);
final File storageDir1 = config.getStorageDirectories().get("1");
final File storageDir2 = config.getStorageDirectories().get("2");
final File index1 = new File(storageDir1, "index-1");
final File index2 = new File(storageDir1, "index-2");
final File index3 = new File(storageDir2, "index-3");
final File index4 = new File(storageDir2, "index-4");
final File index1 = new File(storageDir1, "lucene-8-index-1");
final File index2 = new File(storageDir1, "lucene-8-index-2");
final File index3 = new File(storageDir2, "lucene-8-index-3");
final File index4 = new File(storageDir2, "lucene-8-index-4");
final File[] allIndices = new File[] {index1, index2, index3, index4};
for (final File file : allIndices) {
@ -108,6 +108,8 @@ public class TestIndexDirectoryManager {
mgr.initialize();
File indexDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
assertTrue(indexDir.mkdirs() || indexDir.exists());
final File newFile = new File(indexDir, "1.bin");
try (final OutputStream fos = new FileOutputStream(newFile)) {
final byte[] data = new byte[4096];
@ -136,8 +138,8 @@ public class TestIndexDirectoryManager {
final File storageDir = config.getStorageDirectories().get("1");
final File index1 = new File(storageDir, "index-1");
final File index2 = new File(storageDir, "index-2");
final File index1 = new File(storageDir, "lucene-8-index-1");
final File index2 = new File(storageDir, "lucene-8-index-2");
final File[] allIndices = new File[] {index1, index2};
for (final File file : allIndices) {
@ -147,10 +149,6 @@ public class TestIndexDirectoryManager {
}
assertTrue(index1.mkdirs());
// Wait 1500 millis because some file systems use only second-precision timestamps instead of millisecond-precision timestamps and
// we want to ensure that the two directories have different timestamps. Also using a value of 1500 instead of 1000 because sleep()
// can awake before the given time so we give it a buffer zone.
Thread.sleep(1500L);
final long timestamp = System.currentTimeMillis();
assertTrue(index2.mkdirs());
@ -159,8 +157,7 @@ public class TestIndexDirectoryManager {
mgr.initialize();
final List<File> dirsBefore = mgr.getDirectoriesBefore(timestamp);
assertEquals(1, dirsBefore.size());
assertEquals(index1, dirsBefore.get(0));
assertEquals(2, dirsBefore.size());
} finally {
for (final File file : allIndices) {
file.delete();
@ -174,7 +171,7 @@ public class TestIndexDirectoryManager {
}
private IndexLocation createLocation(final long timestamp, final String partitionName) {
return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName);
return new IndexLocation(new File("lucene-8-index-" + timestamp), timestamp, partitionName);
}
private RepositoryConfiguration createConfig(final int partitions) {

View File

@ -30,7 +30,7 @@ import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.SimpleIndexManager;
import org.apache.nifi.provenance.lucene.StandardIndexManager;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
@ -39,6 +39,7 @@ import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.ArrayListEventStore;
import org.apache.nifi.provenance.store.EventStore;
import org.apache.nifi.provenance.store.StorageResult;
import org.apache.nifi.util.Tuple;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@ -82,12 +83,36 @@ public class TestLuceneEventIndex {
return System.getProperty("os.name").toLowerCase().startsWith("windows");
}
@Test
public void testGetTimeRange() {
final long now = System.currentTimeMillis();
final List<File> indexFiles = new ArrayList<>();
indexFiles.add(new File("index-1000"));
indexFiles.add(new File("lucene-8-index-3000"));
indexFiles.add(new File("index-4000"));
indexFiles.add(new File("index-5000"));
indexFiles.add(new File("lucene-8-index-6000"));
indexFiles.add(new File("index-7000"));
assertEquals(new Tuple<>(1000L, 3000L), LuceneEventIndex.getTimeRange(new File("index-1000"), indexFiles));
assertEquals(new Tuple<>(3000L, 4000L), LuceneEventIndex.getTimeRange(new File("lucene-8-index-3000"), indexFiles));
assertEquals(new Tuple<>(4000L, 5000L), LuceneEventIndex.getTimeRange(new File("index-4000"), indexFiles));
assertEquals(new Tuple<>(5000L, 6000L), LuceneEventIndex.getTimeRange(new File("index-5000"), indexFiles));
assertEquals(new Tuple<>(6000L, 7000L), LuceneEventIndex.getTimeRange(new File("lucene-8-index-6000"), indexFiles));
assertEquals(7000L, LuceneEventIndex.getTimeRange(new File("index-7000"), indexFiles).getKey().longValue());
assertTrue(LuceneEventIndex.getTimeRange(new File("index-7000"), indexFiles).getValue() >= now);
}
@Test(timeout = 60000)
public void testGetMinimumIdToReindex() throws InterruptedException {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration repoConfig = createConfig(1);
repoConfig.setDesiredIndexSize(1L);
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final ArrayListEventStore eventStore = new ArrayListEventStore();
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 20_000, EventReporter.NO_OP);
@ -112,7 +137,7 @@ public class TestLuceneEventIndex {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration repoConfig = createConfig(1);
repoConfig.setDesiredIndexSize(1L);
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final ArrayListEventStore eventStore = new ArrayListEventStore();
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP);
@ -149,7 +174,7 @@ public class TestLuceneEventIndex {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration repoConfig = createConfig(1);
repoConfig.setDesiredIndexSize(1L);
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final ArrayListEventStore eventStore = new ArrayListEventStore();
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP);
@ -225,7 +250,7 @@ public class TestLuceneEventIndex {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration repoConfig = createConfig(1);
repoConfig.setDesiredIndexSize(1L);
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final ArrayListEventStore eventStore = new ArrayListEventStore();
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP);
@ -301,7 +326,7 @@ public class TestLuceneEventIndex {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration repoConfig = createConfig(1);
repoConfig.setDesiredIndexSize(1L);
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final ArrayListEventStore eventStore = new ArrayListEventStore();
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP);
@ -346,7 +371,7 @@ public class TestLuceneEventIndex {
@Override
public Set<String> getGroups() {
return Collections.EMPTY_SET;
return Collections.emptySet();
}
@Override
@ -367,10 +392,10 @@ public class TestLuceneEventIndex {
}
@Test(timeout = 60000)
public void testExpiration() throws InterruptedException, IOException {
public void testExpiration() throws IOException {
final RepositoryConfiguration repoConfig = createConfig(1);
repoConfig.setDesiredIndexSize(1L);
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 1, EventReporter.NO_OP);
@ -381,7 +406,7 @@ public class TestLuceneEventIndex {
final EventStore eventStore = Mockito.mock(EventStore.class);
Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
@Override
public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable {
public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) {
final Long eventId = invocation.getArgumentAt(0, Long.class);
assertEquals(0, eventId.longValue());
assertEquals(1, invocation.getArgumentAt(1, Integer.class).intValue());
@ -411,7 +436,7 @@ public class TestLuceneEventIndex {
public void addThenQueryWithEmptyQuery() throws InterruptedException {
assumeFalse(isWindowsEnvironment());
final RepositoryConfiguration repoConfig = createConfig();
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 1, EventReporter.NO_OP);
@ -451,7 +476,7 @@ public class TestLuceneEventIndex {
@Test(timeout = 50000)
public void testQuerySpecificField() throws InterruptedException {
final RepositoryConfiguration repoConfig = createConfig();
final IndexManager indexManager = new SimpleIndexManager(repoConfig);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 2, EventReporter.NO_OP);

View File

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.UUID;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestCachingIndexManager {
private File indexDir;
private CachingIndexManager manager;
@Before
public void setup() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
manager = new CachingIndexManager();
indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
indexDir.mkdirs();
}
@After
public void cleanup() throws IOException {
manager.close();
FileUtils.deleteFiles(Collections.singleton(indexDir), true);
}
@Test
public void test() throws IOException {
// Create and IndexWriter and add a document to the index, then close the writer.
// This gives us something that we can query.
final EventIndexWriter writer = manager.borrowIndexWriter(indexDir);
final Document doc = new Document();
doc.add(new StringField("unit test", "true", Store.YES));
writer.index(doc, 1000);
manager.returnIndexWriter(writer);
// Get an Index Searcher that we can use to query the index.
final EventIndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir);
// Ensure that we get the expected results.
assertCount(cachedSearcher, 1);
// While we already have an Index Searcher, get a writer for the same index.
// This will cause the Index Searcher to be marked as poisoned.
final EventIndexWriter writer2 = manager.borrowIndexWriter(indexDir);
// Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT*
// be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher
// while the other is not.
final EventIndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir);
assertNotSame(cachedSearcher, nrtSearcher);
// Ensure that we get the expected query results.
assertCount(nrtSearcher, 1);
// Return the writer, so that there is no longer an active writer for the index.
manager.returnIndexWriter(writer2);
// Ensure that we still get the same result.
assertCount(cachedSearcher, 1);
manager.returnIndexSearcher(cachedSearcher);
// Ensure that our near-real-time index searcher still gets the same result.
assertCount(nrtSearcher, 1);
manager.returnIndexSearcher(nrtSearcher);
}
private void assertCount(final EventIndexSearcher searcher, final int count) throws IOException {
final BooleanQuery query = new BooleanQuery();
query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST));
final TopDocs topDocs = searcher.getIndexSearcher().search(query, count * 10);
assertNotNull(topDocs);
assertEquals(1, topDocs.totalHits);
}
}

View File

@ -17,14 +17,6 @@
package org.apache.nifi.provenance.lucene;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
@ -37,6 +29,14 @@ import org.apache.nifi.util.file.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestSimpleIndexManager {
@BeforeClass
public static void setLogLevel() {
@ -45,7 +45,7 @@ public class TestSimpleIndexManager {
@Test
public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration());
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration());
final File dir = new File("target/" + UUID.randomUUID().toString());
try {
final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
@ -64,7 +64,7 @@ public class TestSimpleIndexManager {
final EventIndexSearcher searcher = mgr.borrowIndexSearcher(dir);
final TopDocs topDocs = searcher.getIndexSearcher().search(new MatchAllDocsQuery(), 2);
assertEquals(2, topDocs.totalHits);
assertEquals(2, topDocs.totalHits.value);
mgr.returnIndexSearcher(searcher);
} finally {
FileUtils.deleteFile(dir, true);
@ -75,7 +75,7 @@ public class TestSimpleIndexManager {
public void testWriterCloseIfPreviouslyMarkedCloseable() throws IOException {
final AtomicInteger closeCount = new AtomicInteger(0);
final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()) {
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()) {
@Override
protected void close(IndexWriterCount count) throws IOException {
closeCount.incrementAndGet();
@ -115,7 +115,7 @@ public class TestSimpleIndexManager {
public void testWriterCloseIfOnlyUser() throws IOException {
final AtomicInteger closeCount = new AtomicInteger(0);
final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()) {
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()) {
@Override
protected void close(IndexWriterCount count) throws IOException {
closeCount.incrementAndGet();
@ -133,7 +133,7 @@ public class TestSimpleIndexManager {
public void testWriterLeftOpenIfNotCloseable() throws IOException {
final AtomicInteger closeCount = new AtomicInteger(0);
final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()) {
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()) {
@Override
protected void close(IndexWriterCount count) throws IOException {
closeCount.incrementAndGet();

View File

@ -17,6 +17,15 @@
package org.apache.nifi.provenance.store;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -26,14 +35,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ArrayListEventStore implements EventStore {
private static final Logger logger = LoggerFactory.getLogger(ArrayListEventStore.class);
@ -152,4 +153,9 @@ public class ArrayListEventStore implements EventStore {
@Override
public void reindexLatestEvents(EventIndex eventIndex) {
}
@Override
public EventIterator getEventsByTimestamp(final long minTimestamp, final long maxTimestamp) {
return null;
}
}

View File

@ -17,24 +17,6 @@
package org.apache.nifi.provenance.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
@ -48,6 +30,7 @@ import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.serialization.RecordWriters;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.toc.StandardTocWriter;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
@ -57,6 +40,25 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestPartitionedWriteAheadEventStore {
private static final RecordWriterFactory writerFactory = (file, idGen, compress, createToc) -> RecordWriters.newSchemaRecordWriter(file, idGen, compress, createToc);
private static final RecordReaderFactory readerFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars);
@ -422,6 +424,41 @@ public class TestPartitionedWriteAheadEventStore {
}
@Test
public void testGetEventsByTimestamp() throws IOException {
final RepositoryConfiguration config = createConfig();
config.setMaxEventFileCount(300);
config.setCompressOnRollover(false);
final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager());
store.initialize();
for (int i = 0; i < 1_000; i++) {
final ProvenanceEventRecord event = createEvent();
final ProvenanceEventRecord withTimestamp = new StandardProvenanceEventRecord.Builder()
.fromEvent(event)
.setEventTime(i)
.build();
store.addEvents(Collections.singleton(withTimestamp));
}
final EventIterator iterator = store.getEventsByTimestamp(200, 799);
int count = 0;
Optional<ProvenanceEventRecord> optionalRecord;
while ((optionalRecord = iterator.nextEvent()).isPresent()) {
final ProvenanceEventRecord event = optionalRecord.get();
final long timestamp = event.getEventTime();
assertTrue(timestamp >= 200);
assertTrue(timestamp <= 799);
count++;
}
assertEquals(600, count);
}
private RepositoryConfiguration createConfig() {
return createConfig(2);
}

View File

@ -17,21 +17,6 @@
package org.apache.nifi.provenance.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
import org.apache.nifi.provenance.IdentifierLookup;
@ -50,6 +35,21 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestWriteAheadStorePartition {
@Test
@ -66,10 +66,10 @@ public class TestWriteAheadStorePartition {
return new EventIdFirstSchemaRecordWriter(file, idGenerator, tocWriter, compressed, 32 * 1024, IdentifierLookup.EMPTY);
};
final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars);
final RecordReaderFactory recordReaderFactory = RecordReaders::newRecordReader;
final WriteAheadStorePartition partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory,
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP);
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP, Mockito.mock(EventFileManager.class));
for (int i = 0; i < 100; i++) {
partition.addEvents(Collections.singleton(TestUtil.createEvent()));
@ -113,10 +113,10 @@ public class TestWriteAheadStorePartition {
return new EventIdFirstSchemaRecordWriter(file, idGenerator, tocWriter, compressed, 32 * 1024, IdentifierLookup.EMPTY);
};
final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars);
final RecordReaderFactory recordReaderFactory = RecordReaders::newRecordReader;
WriteAheadStorePartition partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory,
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP);
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP, Mockito.mock(EventFileManager.class));
for (int i = 0; i < 100; i++) {
partition.addEvents(Collections.singleton(TestUtil.createEvent()));
@ -133,7 +133,7 @@ public class TestWriteAheadStorePartition {
assertTrue(new File(storageDirectory, "1" + fileList.get(0).getName()).createNewFile());
partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory,
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP);
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP, Mockito.mock(EventFileManager.class));
partition.initialize();

View File

@ -27,7 +27,7 @@
<module>nifi-provenance-repository-nar</module>
</modules>
<properties>
<lucene.version>4.10.4</lucene.version>
<lucene.version>8.0.0</lucene.version>
</properties>
<dependencyManagement>
<dependencies>