From 84935d4f7840fe9b22aa929afb403718c073e627 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 11 Aug 2017 13:08:29 -0400 Subject: [PATCH] NIFI-4023, NIFI-4077 This closes #2075. Addressed issue where repository was aging off the wrong index. When it should age off Index 1, it was removing Index 2. As a result, the earliest index is never aged off, and the newest index could potentially be aged off before it is ready to be. Also addressed issue where a query that attempts to read an event that has aged off throws FileNotFoundException (NIFI-4077) instead of skipping over the event. The JavaDocs indicate that the EventIterator should skip records that it cannot find, but SelectiveRecordReaderEventIterator throw FileNotFoundException instead Signed-off-by: joewitt --- .../index/lucene/IndexDirectoryManager.java | 21 +++++----- .../index/lucene/IndexLocation.java | 9 +++++ .../index/lucene/LuceneEventIndex.java | 15 ++++--- .../SelectiveRecordReaderEventIterator.java | 40 ++++++++++--------- .../lucene/TestIndexDirectoryManager.java | 38 ++++++++++++++++++ ...estSelectiveRecordReaderEventIterator.java | 23 +++++++++++ 6 files changed, 111 insertions(+), 35 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java index 53f74e070d..033e8d0155 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java @@ -125,9 +125,9 @@ public class IndexDirectoryManager { // If looking at index N, we can determine the index end time by assuming that it is the same as the // start time of index N+1. So we determine the time range of each index and select an index only if // its start time is before the given timestamp and its end time is <= the given timestamp. - for (final List startTimeWithFile : startTimeWithFileByStorageDirectory.values()) { - for (int i = 0; i < startTimeWithFile.size(); i++) { - final IndexLocation indexLoc = startTimeWithFile.get(i); + for (final List locationList : startTimeWithFileByStorageDirectory.values()) { + for (int i = 0; i < locationList.size(); i++) { + final IndexLocation indexLoc = locationList.get(i); final String partition = indexLoc.getPartitionName(); final IndexLocation activeLocation = activeIndices.get(partition); @@ -143,16 +143,13 @@ public class IndexDirectoryManager { break; } - if (i < startTimeWithFile.size() - 1) { - final IndexLocation nextLocation = startTimeWithFile.get(i + 1); - final Long indexEndTime = nextLocation.getIndexStartTimestamp(); - if (indexEndTime <= timestamp) { - logger.debug("Considering Index Location {} older than {} ({}) because its events have an EventTime " - + "ranging from {} ({}) to {} ({}) based on the following IndexLocations: {}", nextLocation, timestamp, new Date(timestamp), - indexStartTime, new Date(indexStartTime), indexEndTime, new Date(indexEndTime), startTimeWithFile); + final long indexEndTime = indexLoc.getIndexEndTimestamp(); + if (indexEndTime <= timestamp) { + logger.debug("Considering Index Location {} older than {} ({}) because its events have an EventTime " + + "ranging from {} ({}) to {} ({}) based on the following IndexLocations: {}", indexLoc, timestamp, new Date(timestamp), + indexStartTime, new Date(indexStartTime), indexEndTime, new Date(indexEndTime), locationList); - selected.add(nextLocation.getIndexDirectory()); - } + selected.add(indexLoc.getIndexDirectory()); } } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java index f7de84fada..b0fb68f7bf 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java @@ -38,6 +38,15 @@ public class IndexLocation { return indexStartTimestamp; } + public long getIndexEndTimestamp() { + final long lastMod = indexDirectory.lastModified(); + if (lastMod == 0) { + return System.currentTimeMillis(); + } + + return lastMod; + } + public String getPartitionName() { return partitionName; } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java index 4a38071cb8..c44c7d2071 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -648,14 +648,19 @@ public class LuceneEventIndex implements EventIndex { void performMaintenance() { try { final List firstEvents = eventStore.getEvents(0, 1); + + final long earliestEventTime; if (firstEvents.isEmpty()) { - return; + earliestEventTime = System.currentTimeMillis(); + logger.debug("Found no events in the Provenance Repository. In order to perform maintenace of the indices, " + + "will assume that the first event time is now ({})", System.currentTimeMillis()); + } else { + final ProvenanceEventRecord firstEvent = firstEvents.get(0); + earliestEventTime = firstEvent.getEventTime(); + logger.debug("First Event Time is {} ({}) with Event ID {}; will delete any Lucene Index that is older than this", + earliestEventTime, new Date(earliestEventTime), firstEvent.getEventId()); } - final ProvenanceEventRecord firstEvent = firstEvents.get(0); - final long earliestEventTime = firstEvent.getEventTime(); - logger.debug("First Event Time is {} ({}) with Event ID {}; will delete any Lucene Index that is older than this", - earliestEventTime, new Date(earliestEventTime), firstEvent.getEventId()); final List indicesBeforeEarliestEvent = directoryManager.getDirectoriesBefore(earliestEventTime); for (final File index : indicesBeforeEarliestEvent) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java index c4a130ba01..e09fe05aa7 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java @@ -17,7 +17,9 @@ package org.apache.nifi.provenance.store.iterator; +import java.io.EOFException; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -123,28 +125,30 @@ public class SelectiveRecordReaderEventIterator implements EventIterator { continue; } - // If we determined which file the event should be in, and that's not the file that - // we are currently reading from, rotate the reader to the appropriate one. - if (!fileForEvent.equals(currentFile)) { - if (reader != null) { - try { - reader.close(); - } catch (final Exception e) { - logger.warn("Failed to close {}; some resources may not be cleaned up appropriately", reader); + try { + // If we determined which file the event should be in, and that's not the file that + // we are currently reading from, rotate the reader to the appropriate one. + if (!fileForEvent.equals(currentFile)) { + if (reader != null) { + try { + reader.close(); + } catch (final Exception e) { + logger.warn("Failed to close {}; some resources may not be cleaned up appropriately", reader); + } } + + reader = readerFactory.newRecordReader(fileForEvent, Collections.emptyList(), maxAttributeChars); + this.currentFile = fileForEvent; } - reader = readerFactory.newRecordReader(fileForEvent, Collections.emptyList(), maxAttributeChars); - this.currentFile = fileForEvent; + final Optional eventOption = reader.skipToEvent(eventId); + if (eventOption.isPresent() && eventOption.get().getEventId() == eventId) { + reader.nextRecord(); // consume the event from the stream. + return eventOption; + } + } catch (final FileNotFoundException | EOFException e) { + logger.warn("Failed to retrieve Event with ID {}", eventId, e); } - - final Optional eventOption = reader.skipToEvent(eventId); - if (eventOption.isPresent() && eventOption.get().getEventId() == eventId) { - reader.nextRecord(); // consume the event from the stream. - return eventOption; - } - - continue; } return Optional.empty(); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java index efcb601dbb..9c29e246ec 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java @@ -129,6 +129,44 @@ public class TestIndexDirectoryManager { } } + @Test + public void testGetDirectoriesBefore() throws InterruptedException { + final RepositoryConfiguration config = createConfig(2); + config.setDesiredIndexSize(4096 * 128); + + final File storageDir = config.getStorageDirectories().get("1"); + + final File index1 = new File(storageDir, "index-1"); + final File index2 = new File(storageDir, "index-2"); + + final File[] allIndices = new File[] {index1, index2}; + for (final File file : allIndices) { + if (file.exists()) { + assertTrue(file.delete()); + } + } + + assertTrue(index1.mkdirs()); + // Wait 1500 millis because some file systems use only second-precision timestamps instead of millisecond-precision timestamps and + // we want to ensure that the two directories have different timestamps. Also using a value of 1500 instead of 1000 because sleep() + // can awake before the given time so we give it a buffer zone. + Thread.sleep(1500L); + final long timestamp = System.currentTimeMillis(); + assertTrue(index2.mkdirs()); + + try { + final IndexDirectoryManager mgr = new IndexDirectoryManager(config); + mgr.initialize(); + + final List dirsBefore = mgr.getDirectoriesBefore(timestamp); + assertEquals(1, dirsBefore.size()); + assertEquals(index1, dirsBefore.get(0)); + } finally { + for (final File file : allIndices) { + file.delete(); + } + } + } private IndexLocation createLocation(final long timestamp) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java index 0089f6191d..2cfbce299c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java @@ -18,6 +18,7 @@ package org.apache.nifi.provenance.store.iterator; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.io.File; import java.io.IOException; @@ -25,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -83,6 +85,27 @@ public class TestSelectiveRecordReaderEventIterator { assertEquals(Arrays.asList(new File[] {file1, file1000}), filteredFiles); } + @Test + public void testFileNotFound() throws IOException { + final File file1 = new File("1.prov"); + + // Filter out the first file. + final List files = new ArrayList<>(); + files.add(file1); + + List eventIds = new ArrayList<>(); + eventIds.add(1L); + eventIds.add(5L); + + final RecordReaderFactory readerFactory = (file, logs, maxChars) -> { + return RecordReaders.newRecordReader(file, logs, maxChars); + }; + + final SelectiveRecordReaderEventIterator itr = new SelectiveRecordReaderEventIterator(files, readerFactory, eventIds, 65536); + final Optional firstRecordOption = itr.nextEvent(); + assertFalse(firstRecordOption.isPresent()); + } + @Test @Ignore("For local testing only. Runs indefinitely") public void testPerformanceOfRandomAccessReads() throws Exception {