NIFI-756: Do not remove documents from a Lucene Index; instead, wait until the entire index is 'expired' and delete the index's directory

This commit is contained in:
Mark Payne 2015-08-19 12:02:18 -04:00
parent 8ebf1f03c2
commit 16dc5d5fd9
4 changed files with 243 additions and 19 deletions

View File

@ -20,6 +20,7 @@ import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@ -123,6 +124,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final AtomicBoolean recoveryFinished = new AtomicBoolean(false);
private volatile boolean closed = false;
private volatile long firstEventTimestamp = 0L;
// the following are all protected by the lock
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
@ -265,6 +267,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}, 1L, 1L, TimeUnit.MINUTES);
}
firstEventTimestamp = determineFirstEventTimestamp();
} finally {
writeLock.unlock();
}
@ -282,8 +286,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
final int indexThreads = properties.getIntegerProperty(
NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1);
final int indexThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1);
final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
@ -944,6 +947,167 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
updated = idToPathMap.compareAndSet(existingPathMap, newPathMap);
logger.debug("After expiration, path map: {}", newPathMap);
}
purgeExpiredIndexes();
}
private void purgeExpiredIndexes() throws IOException {
// Now that we have potentially removed expired Provenance Event Log Files, we can look at
// whether or not we can delete any of the indexes. An index can be deleted if all of the
// data that is associated with that index has already been deleted. In order to test this,
// we will get the timestamp of the earliest event and then compare that to the latest timestamp
// that would be indexed by the earliest index. If the event occurred after the timestamp of
// the latest index, then we can just delete the entire index all together.
// find all of the index directories
final List<File> indexDirs = getAllIndexDirectories();
if (indexDirs.size() < 2) {
this.firstEventTimestamp = determineFirstEventTimestamp();
return;
}
// Indexes are named "index-XXX" where the XXX is the timestamp of the earliest event that
// could be in the index. Once we have finished with one index, we move on to another index,
// but we don't move on until we are finished with the previous index.
// Therefore, an efficient way to determine the latest timestamp of one index is to look at the
// timestamp of the next index (these could potentially overlap for one millisecond). This is
// efficient because we can determine the earliest timestamp of an index simply by looking at
// the name of the Index's directory.
final long latestTimestampOfFirstIndex = getIndexTimestamp(indexDirs.get(1));
// Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
// in the event file.
final List<File> logFiles = getSortedLogFiles();
if (logFiles.isEmpty()) {
this.firstEventTimestamp = System.currentTimeMillis();
return;
}
final File firstLogFile = logFiles.get(0);
long earliestEventTime = System.currentTimeMillis();
long maxEventId = -1L;
try (final RecordReader reader = RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord event = reader.nextRecord();
earliestEventTime = event.getEventTime();
try {
maxEventId = reader.getMaxEventId();
} catch (final IOException ioe) {
logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of "
+ "events in the Provenance Repository may be inaccurate.", firstLogFile);
}
}
// check if we can delete the index safely.
if (latestTimestampOfFirstIndex <= earliestEventTime) {
// we can safely delete the first index because the latest event in the index is an event
// that has already been expired from the repository.
final File indexingDirectory = indexDirs.get(0);
indexManager.removeIndex(indexingDirectory);
indexConfig.removeIndexDirectory(indexingDirectory);
deleteDirectory(indexingDirectory);
if (maxEventId > -1L) {
indexConfig.setMinIdIndexed(maxEventId + 1L);
}
}
this.firstEventTimestamp = earliestEventTime;
}
private long determineFirstEventTimestamp() {
// Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
// in the event file.
final List<File> logFiles = getSortedLogFiles();
if (logFiles.isEmpty()) {
return 0L;
}
for (final File logFile : logFiles) {
try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord event = reader.nextRecord();
return event.getEventTime();
} catch (final IOException ioe) {
logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
}
}
return 0L;
}
/**
* Recursively deletes the given directory. If unable to delete the directory, will emit a WARN level
* log event and move on.
*
* @param dir the directory to delete
*/
private void deleteDirectory(final File dir) {
if (dir == null || !dir.exists()) {
return;
}
final File[] children = dir.listFiles();
if (children == null) {
return;
}
for (final File child : children) {
if (child.isDirectory()) {
deleteDirectory(child);
} else if (!child.delete()) {
logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", child.getAbsolutePath());
}
}
if (!dir.delete()) {
logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", dir);
}
}
/**
* @return a List of all Index directories, sorted by timestamp of the earliest event that could
* be present in the index
*/
private List<File> getAllIndexDirectories() {
final List<File> allIndexDirs = new ArrayList<>();
for (final File storageDir : configuration.getStorageDirectories()) {
final File[] indexDirs = storageDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
return INDEX_PATTERN.matcher(name).matches();
}
});
if (indexDirs != null) {
for (final File indexDir : indexDirs) {
allIndexDirs.add(indexDir);
}
}
}
Collections.sort(allIndexDirs, new Comparator<File>() {
@Override
public int compare(final File o1, final File o2) {
final long time1 = getIndexTimestamp(o1);
final long time2 = getIndexTimestamp(o2);
return Long.compare(time1, time2);
}
});
return allIndexDirs;
}
/**
* Takes a File that has a filename "index-" followed by a Long and returns the
* value of that Long
*
* @param indexDirectory the index directory to obtain the timestamp for
* @return the timestamp associated with the given index
*/
private long getIndexTimestamp(final File indexDirectory) {
final String name = indexDirectory.getName();
final int dashIndex = name.indexOf("-");
return Long.parseLong(name.substring(dashIndex + 1));
}
/**
@ -2004,6 +2168,37 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return false;
}
/**
* @return a List of all Provenance Event Log Files, sorted in ascending order by the first Event ID in each file
*/
private List<File> getSortedLogFiles() {
final List<Path> paths = new ArrayList<>(getAllLogFiles());
Collections.sort(paths, new Comparator<Path>() {
@Override
public int compare(final Path o1, final Path o2) {
return Long.compare(getFirstEventId(o1.toFile()), getFirstEventId(o2.toFile()));
}
});
final List<File> files = new ArrayList<>(paths.size());
for (final Path path : paths) {
files.add(path.toFile());
}
return files;
}
/**
* Returns the Event ID of the first event in the given Provenance Event Log File.
*
* @param logFile the log file from which to obtain the first Event ID
* @return the ID of the first event in the given log file
*/
private long getFirstEventId(final File logFile) {
final String name = logFile.getName();
final int dotIndex = name.indexOf(".");
return Long.parseLong(name.substring(0, dotIndex));
}
public Collection<Path> getAllLogFiles() {
final SortedMap<Long, Path> map = idToPathMap.get();
return map == null ? new ArrayList<Path>() : map.values();
@ -2095,7 +2290,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
public void run() {
try {
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
final StandardQueryResult queryResult = search.search(query, retrievalCount);
final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp);
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
if (queryResult.isFinished()) {
logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}",

View File

@ -124,6 +124,7 @@ public class DocsReader {
int logFileCount = 0;
final Set<String> storageFilesToSkip = new HashSet<>();
int eventsReadThisFile = 0;
try {
for (final Document d : docs) {
@ -135,6 +136,7 @@ public class DocsReader {
try {
if (reader != null && storageFilename.equals(lastStorageFilename)) {
matchingRecords.add(getRecord(d, reader));
eventsReadThisFile++;
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
@ -162,8 +164,13 @@ public class DocsReader {
for (final File file : potentialFiles) {
try {
if (reader != null) {
logger.debug("Read {} records from previous file", eventsReadThisFile);
}
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
matchingRecords.add(getRecord(d, reader));
eventsReadThisFile = 1;
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
@ -183,6 +190,7 @@ public class DocsReader {
}
}
logger.debug("Read {} records from previous file", eventsReadThisFile);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount);

View File

@ -48,7 +48,7 @@ public class IndexSearch {
this.maxAttributeChars = maxAttributeChars;
}
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException {
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount, final long firstEventTimestamp) throws IOException {
if (!indexDirectory.exists() && !indexDirectory.mkdirs()) {
throw new IOException("Unable to create Indexing Directory " + indexDirectory);
}
@ -59,6 +59,12 @@ public class IndexSearch {
final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
final Set<ProvenanceEventRecord> matchingRecords;
// we need to set the start date because if we do not, the first index may still have events that have aged off from
// the repository, and we don't want those events to count toward the total number of matches.
if (provenanceQuery.getStartDate() == null || provenanceQuery.getStartDate().getTime() < firstEventTimestamp) {
provenanceQuery.setStartDate(new Date(firstEventTimestamp));
}
if (provenanceQuery.getEndDate() == null) {
provenanceQuery.setEndDate(new Date());
}

View File

@ -934,11 +934,12 @@ public class TestPersistentProvenanceRepository {
@Test
public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxRecordLife(3, TimeUnit.SECONDS);
config.setMaxRecordLife(5, TimeUnit.MINUTES);
config.setMaxStorageCapacity(1024L * 1024L);
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
config.setMaxEventFileCapacity(1024L * 1024L);
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
config.setDesiredIndexSize(10); // force new index to be created for each rollover
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
repo.initialize(getEventReporter());
@ -960,11 +961,27 @@ public class TestPersistentProvenanceRepository {
for (int i = 0; i < 10; i++) {
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
builder.setEventTime(10L); // make sure the events are destroyed when we call purge
repo.registerEvent(builder.build());
}
repo.waitForRollover();
Thread.sleep(2000L);
// add more records so that we will create a new index
final long secondBatchStartTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
attributes.put("uuid", "00000000-0000-0000-0000-00000000001" + i);
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
builder.setEventTime(System.currentTimeMillis());
repo.registerEvent(builder.build());
}
// wait for indexing to happen
repo.waitForRollover();
// verify we get the results expected
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
@ -972,31 +989,30 @@ public class TestPersistentProvenanceRepository {
query.setMaxResults(100);
final QueryResult result = repo.queryEvents(query);
assertEquals(10, result.getMatchingEvents().size());
assertEquals(20, result.getMatchingEvents().size());
Thread.sleep(2000L);
// Ensure index directory exists
// Ensure index directories exists
final FileFilter indexFileFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().startsWith("index");
}
};
File[] storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(1, storageDirFiles.length);
File[] indexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(2, indexDirs.length);
config.setMaxStorageCapacity(100L);
config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
// expire old events and indexes
final long timeSinceSecondBatch = System.currentTimeMillis() - secondBatchStartTime;
config.setMaxRecordLife(timeSinceSecondBatch + 1000L, TimeUnit.MILLISECONDS);
repo.purgeOldEvents();
Thread.sleep(2000L);
final QueryResult newRecordSet = repo.queryEvents(query);
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
assertEquals(10, newRecordSet.getMatchingEvents().size());
// Ensure index directory is gone
storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(0, storageDirFiles.length);
// Ensure that one index directory is gone
indexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(1, indexDirs.length);
}
@ -1124,8 +1140,7 @@ public class TestPersistentProvenanceRepository {
final TopDocs topDocs = searcher.search(luceneQuery, 1000);
final List<Document> docs = new ArrayList<>();
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
final ScoreDoc scoreDoc = topDocs.scoreDocs[i];
for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc;
final Document d = directoryReader.document(docId);
docs.add(d);