diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 4657686baa..66f96423d2 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -20,6 +20,7 @@ import java.io.EOFException; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -123,6 +124,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicBoolean recoveryFinished = new AtomicBoolean(false); private volatile boolean closed = false; + private volatile long firstEventTimestamp = 0L; // the following are all protected by the lock private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); @@ -265,6 +267,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }, 1L, 1L, TimeUnit.MINUTES); } + + firstEventTimestamp = determineFirstEventTimestamp(); } finally { writeLock.unlock(); } @@ -282,8 +286,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); - final int indexThreads = properties.getIntegerProperty( - NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1); + final int indexThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1); final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); @@ -944,6 +947,167 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository updated = idToPathMap.compareAndSet(existingPathMap, newPathMap); logger.debug("After expiration, path map: {}", newPathMap); } + + purgeExpiredIndexes(); + } + + private void purgeExpiredIndexes() throws IOException { + // Now that we have potentially removed expired Provenance Event Log Files, we can look at + // whether or not we can delete any of the indexes. An index can be deleted if all of the + // data that is associated with that index has already been deleted. In order to test this, + // we will get the timestamp of the earliest event and then compare that to the latest timestamp + // that would be indexed by the earliest index. If the event occurred after the timestamp of + // the latest index, then we can just delete the entire index all together. + + // find all of the index directories + final List indexDirs = getAllIndexDirectories(); + if (indexDirs.size() < 2) { + this.firstEventTimestamp = determineFirstEventTimestamp(); + return; + } + + // Indexes are named "index-XXX" where the XXX is the timestamp of the earliest event that + // could be in the index. Once we have finished with one index, we move on to another index, + // but we don't move on until we are finished with the previous index. + // Therefore, an efficient way to determine the latest timestamp of one index is to look at the + // timestamp of the next index (these could potentially overlap for one millisecond). This is + // efficient because we can determine the earliest timestamp of an index simply by looking at + // the name of the Index's directory. + final long latestTimestampOfFirstIndex = getIndexTimestamp(indexDirs.get(1)); + + // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event + // in the event file. + final List logFiles = getSortedLogFiles(); + if (logFiles.isEmpty()) { + this.firstEventTimestamp = System.currentTimeMillis(); + return; + } + + final File firstLogFile = logFiles.get(0); + long earliestEventTime = System.currentTimeMillis(); + long maxEventId = -1L; + try (final RecordReader reader = RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) { + final StandardProvenanceEventRecord event = reader.nextRecord(); + earliestEventTime = event.getEventTime(); + + try { + maxEventId = reader.getMaxEventId(); + } catch (final IOException ioe) { + logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of " + + "events in the Provenance Repository may be inaccurate.", firstLogFile); + } + } + + // check if we can delete the index safely. + if (latestTimestampOfFirstIndex <= earliestEventTime) { + // we can safely delete the first index because the latest event in the index is an event + // that has already been expired from the repository. + final File indexingDirectory = indexDirs.get(0); + indexManager.removeIndex(indexingDirectory); + indexConfig.removeIndexDirectory(indexingDirectory); + deleteDirectory(indexingDirectory); + + if (maxEventId > -1L) { + indexConfig.setMinIdIndexed(maxEventId + 1L); + } + } + + this.firstEventTimestamp = earliestEventTime; + } + + private long determineFirstEventTimestamp() { + // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event + // in the event file. + final List logFiles = getSortedLogFiles(); + if (logFiles.isEmpty()) { + return 0L; + } + + for (final File logFile : logFiles) { + try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) { + final StandardProvenanceEventRecord event = reader.nextRecord(); + return event.getEventTime(); + } catch (final IOException ioe) { + logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile); + } + } + + return 0L; + } + + /** + * Recursively deletes the given directory. If unable to delete the directory, will emit a WARN level + * log event and move on. + * + * @param dir the directory to delete + */ + private void deleteDirectory(final File dir) { + if (dir == null || !dir.exists()) { + return; + } + + final File[] children = dir.listFiles(); + if (children == null) { + return; + } + + for (final File child : children) { + if (child.isDirectory()) { + deleteDirectory(child); + } else if (!child.delete()) { + logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", child.getAbsolutePath()); + } + } + + if (!dir.delete()) { + logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", dir); + } + } + + /** + * @return a List of all Index directories, sorted by timestamp of the earliest event that could + * be present in the index + */ + private List getAllIndexDirectories() { + final List allIndexDirs = new ArrayList<>(); + for (final File storageDir : configuration.getStorageDirectories()) { + final File[] indexDirs = storageDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return INDEX_PATTERN.matcher(name).matches(); + } + }); + + if (indexDirs != null) { + for (final File indexDir : indexDirs) { + allIndexDirs.add(indexDir); + } + } + } + + Collections.sort(allIndexDirs, new Comparator() { + @Override + public int compare(final File o1, final File o2) { + final long time1 = getIndexTimestamp(o1); + final long time2 = getIndexTimestamp(o2); + return Long.compare(time1, time2); + } + }); + + return allIndexDirs; + } + + /** + * Takes a File that has a filename "index-" followed by a Long and returns the + * value of that Long + * + * @param indexDirectory the index directory to obtain the timestamp for + * @return the timestamp associated with the given index + */ + private long getIndexTimestamp(final File indexDirectory) { + final String name = indexDirectory.getName(); + final int dashIndex = name.indexOf("-"); + return Long.parseLong(name.substring(dashIndex + 1)); } /** @@ -2004,6 +2168,37 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return false; } + /** + * @return a List of all Provenance Event Log Files, sorted in ascending order by the first Event ID in each file + */ + private List getSortedLogFiles() { + final List paths = new ArrayList<>(getAllLogFiles()); + Collections.sort(paths, new Comparator() { + @Override + public int compare(final Path o1, final Path o2) { + return Long.compare(getFirstEventId(o1.toFile()), getFirstEventId(o2.toFile())); + } + }); + + final List files = new ArrayList<>(paths.size()); + for (final Path path : paths) { + files.add(path.toFile()); + } + return files; + } + + /** + * Returns the Event ID of the first event in the given Provenance Event Log File. + * + * @param logFile the log file from which to obtain the first Event ID + * @return the ID of the first event in the given log file + */ + private long getFirstEventId(final File logFile) { + final String name = logFile.getName(); + final int dotIndex = name.indexOf("."); + return Long.parseLong(name.substring(0, dotIndex)); + } + public Collection getAllLogFiles() { final SortedMap map = idToPathMap.get(); return map == null ? new ArrayList() : map.values(); @@ -2095,7 +2290,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public void run() { try { final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars); - final StandardQueryResult queryResult = search.search(query, retrievalCount); + final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); if (queryResult.isFinished()) { logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}", diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index eef46281a2..c2a7609eb8 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -124,6 +124,7 @@ public class DocsReader { int logFileCount = 0; final Set storageFilesToSkip = new HashSet<>(); + int eventsReadThisFile = 0; try { for (final Document d : docs) { @@ -135,6 +136,7 @@ public class DocsReader { try { if (reader != null && storageFilename.equals(lastStorageFilename)) { matchingRecords.add(getRecord(d, reader)); + eventsReadThisFile++; if ( retrievalCount.incrementAndGet() >= maxResults ) { break; @@ -162,8 +164,13 @@ public class DocsReader { for (final File file : potentialFiles) { try { + if (reader != null) { + logger.debug("Read {} records from previous file", eventsReadThisFile); + } + reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars); matchingRecords.add(getRecord(d, reader)); + eventsReadThisFile = 1; if ( retrievalCount.incrementAndGet() >= maxResults ) { break; @@ -183,6 +190,7 @@ public class DocsReader { } } + logger.debug("Read {} records from previous file", eventsReadThisFile); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index c9bb238bed..7fcd8ab718 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -48,7 +48,7 @@ public class IndexSearch { this.maxAttributeChars = maxAttributeChars; } - public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException { + public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount, final long firstEventTimestamp) throws IOException { if (!indexDirectory.exists() && !indexDirectory.mkdirs()) { throw new IOException("Unable to create Indexing Directory " + indexDirectory); } @@ -59,6 +59,12 @@ public class IndexSearch { final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1); final Set matchingRecords; + // we need to set the start date because if we do not, the first index may still have events that have aged off from + // the repository, and we don't want those events to count toward the total number of matches. + if (provenanceQuery.getStartDate() == null || provenanceQuery.getStartDate().getTime() < firstEventTimestamp) { + provenanceQuery.setStartDate(new Date(firstEventTimestamp)); + } + if (provenanceQuery.getEndDate() == null) { provenanceQuery.setEndDate(new Date()); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 713180f206..687574351e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -934,11 +934,12 @@ public class TestPersistentProvenanceRepository { @Test public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException { final RepositoryConfiguration config = createConfiguration(); - config.setMaxRecordLife(3, TimeUnit.SECONDS); + config.setMaxRecordLife(5, TimeUnit.MINUTES); config.setMaxStorageCapacity(1024L * 1024L); config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setMaxEventFileCapacity(1024L * 1024L); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); // force new index to be created for each rollover repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); repo.initialize(getEventReporter()); @@ -960,11 +961,27 @@ public class TestPersistentProvenanceRepository { for (int i = 0; i < 10; i++) { attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(10L); // make sure the events are destroyed when we call purge repo.registerEvent(builder.build()); } repo.waitForRollover(); + Thread.sleep(2000L); + + // add more records so that we will create a new index + final long secondBatchStartTime = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) { + attributes.put("uuid", "00000000-0000-0000-0000-00000000001" + i); + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(System.currentTimeMillis()); + repo.registerEvent(builder.build()); + } + + // wait for indexing to happen + repo.waitForRollover(); + + // verify we get the results expected final Query query = new Query(UUID.randomUUID().toString()); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4")); @@ -972,31 +989,30 @@ public class TestPersistentProvenanceRepository { query.setMaxResults(100); final QueryResult result = repo.queryEvents(query); - assertEquals(10, result.getMatchingEvents().size()); + assertEquals(20, result.getMatchingEvents().size()); - Thread.sleep(2000L); - - // Ensure index directory exists + // Ensure index directories exists final FileFilter indexFileFilter = new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().startsWith("index"); } }; - File[] storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter); - assertEquals(1, storageDirFiles.length); + File[] indexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter); + assertEquals(2, indexDirs.length); - config.setMaxStorageCapacity(100L); - config.setMaxRecordLife(500, TimeUnit.MILLISECONDS); + // expire old events and indexes + final long timeSinceSecondBatch = System.currentTimeMillis() - secondBatchStartTime; + config.setMaxRecordLife(timeSinceSecondBatch + 1000L, TimeUnit.MILLISECONDS); repo.purgeOldEvents(); Thread.sleep(2000L); final QueryResult newRecordSet = repo.queryEvents(query); - assertTrue(newRecordSet.getMatchingEvents().isEmpty()); + assertEquals(10, newRecordSet.getMatchingEvents().size()); - // Ensure index directory is gone - storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter); - assertEquals(0, storageDirFiles.length); + // Ensure that one index directory is gone + indexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter); + assertEquals(1, indexDirs.length); } @@ -1124,8 +1140,7 @@ public class TestPersistentProvenanceRepository { final TopDocs topDocs = searcher.search(luceneQuery, 1000); final List docs = new ArrayList<>(); - for (int i = 0; i < topDocs.scoreDocs.length; i++) { - final ScoreDoc scoreDoc = topDocs.scoreDocs[i]; + for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { final int docId = scoreDoc.doc; final Document d = directoryReader.document(docId); docs.add(d);