NIFI-4023, NIFI-4077 This closes #2075. Addressed issue where repository was aging off the wrong index. When it should age off Index 1, it was removing Index 2. As a result, the earliest index is never aged off, and the newest index could potentially be aged off before it is ready to be. Also addressed issue where a query that attempts to read an event that has aged off throws FileNotFoundException (NIFI-4077) instead of skipping over the event. The JavaDocs indicate that the EventIterator should skip records that it cannot find, but SelectiveRecordReaderEventIterator throw FileNotFoundException instead

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-08-11 13:08:29 -04:00 committed by joewitt
parent ae940d8624
commit 84935d4f78
6 changed files with 111 additions and 35 deletions

View File

@ -125,9 +125,9 @@ public class IndexDirectoryManager {
// If looking at index N, we can determine the index end time by assuming that it is the same as the // If looking at index N, we can determine the index end time by assuming that it is the same as the
// start time of index N+1. So we determine the time range of each index and select an index only if // start time of index N+1. So we determine the time range of each index and select an index only if
// its start time is before the given timestamp and its end time is <= the given timestamp. // its start time is before the given timestamp and its end time is <= the given timestamp.
for (final List<IndexLocation> startTimeWithFile : startTimeWithFileByStorageDirectory.values()) { for (final List<IndexLocation> locationList : startTimeWithFileByStorageDirectory.values()) {
for (int i = 0; i < startTimeWithFile.size(); i++) { for (int i = 0; i < locationList.size(); i++) {
final IndexLocation indexLoc = startTimeWithFile.get(i); final IndexLocation indexLoc = locationList.get(i);
final String partition = indexLoc.getPartitionName(); final String partition = indexLoc.getPartitionName();
final IndexLocation activeLocation = activeIndices.get(partition); final IndexLocation activeLocation = activeIndices.get(partition);
@ -143,16 +143,13 @@ public class IndexDirectoryManager {
break; break;
} }
if (i < startTimeWithFile.size() - 1) { final long indexEndTime = indexLoc.getIndexEndTimestamp();
final IndexLocation nextLocation = startTimeWithFile.get(i + 1); if (indexEndTime <= timestamp) {
final Long indexEndTime = nextLocation.getIndexStartTimestamp(); logger.debug("Considering Index Location {} older than {} ({}) because its events have an EventTime "
if (indexEndTime <= timestamp) { + "ranging from {} ({}) to {} ({}) based on the following IndexLocations: {}", indexLoc, timestamp, new Date(timestamp),
logger.debug("Considering Index Location {} older than {} ({}) because its events have an EventTime " indexStartTime, new Date(indexStartTime), indexEndTime, new Date(indexEndTime), locationList);
+ "ranging from {} ({}) to {} ({}) based on the following IndexLocations: {}", nextLocation, timestamp, new Date(timestamp),
indexStartTime, new Date(indexStartTime), indexEndTime, new Date(indexEndTime), startTimeWithFile);
selected.add(nextLocation.getIndexDirectory()); selected.add(indexLoc.getIndexDirectory());
}
} }
} }
} }

View File

@ -38,6 +38,15 @@ public class IndexLocation {
return indexStartTimestamp; return indexStartTimestamp;
} }
public long getIndexEndTimestamp() {
final long lastMod = indexDirectory.lastModified();
if (lastMod == 0) {
return System.currentTimeMillis();
}
return lastMod;
}
public String getPartitionName() { public String getPartitionName() {
return partitionName; return partitionName;
} }

View File

@ -648,14 +648,19 @@ public class LuceneEventIndex implements EventIndex {
void performMaintenance() { void performMaintenance() {
try { try {
final List<ProvenanceEventRecord> firstEvents = eventStore.getEvents(0, 1); final List<ProvenanceEventRecord> firstEvents = eventStore.getEvents(0, 1);
final long earliestEventTime;
if (firstEvents.isEmpty()) { if (firstEvents.isEmpty()) {
return; earliestEventTime = System.currentTimeMillis();
logger.debug("Found no events in the Provenance Repository. In order to perform maintenace of the indices, "
+ "will assume that the first event time is now ({})", System.currentTimeMillis());
} else {
final ProvenanceEventRecord firstEvent = firstEvents.get(0);
earliestEventTime = firstEvent.getEventTime();
logger.debug("First Event Time is {} ({}) with Event ID {}; will delete any Lucene Index that is older than this",
earliestEventTime, new Date(earliestEventTime), firstEvent.getEventId());
} }
final ProvenanceEventRecord firstEvent = firstEvents.get(0);
final long earliestEventTime = firstEvent.getEventTime();
logger.debug("First Event Time is {} ({}) with Event ID {}; will delete any Lucene Index that is older than this",
earliestEventTime, new Date(earliestEventTime), firstEvent.getEventId());
final List<File> indicesBeforeEarliestEvent = directoryManager.getDirectoriesBefore(earliestEventTime); final List<File> indicesBeforeEarliestEvent = directoryManager.getDirectoriesBefore(earliestEventTime);
for (final File index : indicesBeforeEarliestEvent) { for (final File index : indicesBeforeEarliestEvent) {

View File

@ -17,7 +17,9 @@
package org.apache.nifi.provenance.store.iterator; package org.apache.nifi.provenance.store.iterator;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -123,28 +125,30 @@ public class SelectiveRecordReaderEventIterator implements EventIterator {
continue; continue;
} }
// If we determined which file the event should be in, and that's not the file that try {
// we are currently reading from, rotate the reader to the appropriate one. // If we determined which file the event should be in, and that's not the file that
if (!fileForEvent.equals(currentFile)) { // we are currently reading from, rotate the reader to the appropriate one.
if (reader != null) { if (!fileForEvent.equals(currentFile)) {
try { if (reader != null) {
reader.close(); try {
} catch (final Exception e) { reader.close();
logger.warn("Failed to close {}; some resources may not be cleaned up appropriately", reader); } catch (final Exception e) {
logger.warn("Failed to close {}; some resources may not be cleaned up appropriately", reader);
}
} }
reader = readerFactory.newRecordReader(fileForEvent, Collections.emptyList(), maxAttributeChars);
this.currentFile = fileForEvent;
} }
reader = readerFactory.newRecordReader(fileForEvent, Collections.emptyList(), maxAttributeChars); final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(eventId);
this.currentFile = fileForEvent; if (eventOption.isPresent() && eventOption.get().getEventId() == eventId) {
reader.nextRecord(); // consume the event from the stream.
return eventOption;
}
} catch (final FileNotFoundException | EOFException e) {
logger.warn("Failed to retrieve Event with ID {}", eventId, e);
} }
final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(eventId);
if (eventOption.isPresent() && eventOption.get().getEventId() == eventId) {
reader.nextRecord(); // consume the event from the stream.
return eventOption;
}
continue;
} }
return Optional.empty(); return Optional.empty();

View File

@ -129,6 +129,44 @@ public class TestIndexDirectoryManager {
} }
} }
@Test
public void testGetDirectoriesBefore() throws InterruptedException {
final RepositoryConfiguration config = createConfig(2);
config.setDesiredIndexSize(4096 * 128);
final File storageDir = config.getStorageDirectories().get("1");
final File index1 = new File(storageDir, "index-1");
final File index2 = new File(storageDir, "index-2");
final File[] allIndices = new File[] {index1, index2};
for (final File file : allIndices) {
if (file.exists()) {
assertTrue(file.delete());
}
}
assertTrue(index1.mkdirs());
// Wait 1500 millis because some file systems use only second-precision timestamps instead of millisecond-precision timestamps and
// we want to ensure that the two directories have different timestamps. Also using a value of 1500 instead of 1000 because sleep()
// can awake before the given time so we give it a buffer zone.
Thread.sleep(1500L);
final long timestamp = System.currentTimeMillis();
assertTrue(index2.mkdirs());
try {
final IndexDirectoryManager mgr = new IndexDirectoryManager(config);
mgr.initialize();
final List<File> dirsBefore = mgr.getDirectoriesBefore(timestamp);
assertEquals(1, dirsBefore.size());
assertEquals(index1, dirsBefore.get(0));
} finally {
for (final File file : allIndices) {
file.delete();
}
}
}
private IndexLocation createLocation(final long timestamp) { private IndexLocation createLocation(final long timestamp) {

View File

@ -18,6 +18,7 @@
package org.apache.nifi.provenance.store.iterator; package org.apache.nifi.provenance.store.iterator;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -25,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -83,6 +85,27 @@ public class TestSelectiveRecordReaderEventIterator {
assertEquals(Arrays.asList(new File[] {file1, file1000}), filteredFiles); assertEquals(Arrays.asList(new File[] {file1, file1000}), filteredFiles);
} }
@Test
public void testFileNotFound() throws IOException {
final File file1 = new File("1.prov");
// Filter out the first file.
final List<File> files = new ArrayList<>();
files.add(file1);
List<Long> eventIds = new ArrayList<>();
eventIds.add(1L);
eventIds.add(5L);
final RecordReaderFactory readerFactory = (file, logs, maxChars) -> {
return RecordReaders.newRecordReader(file, logs, maxChars);
};
final SelectiveRecordReaderEventIterator itr = new SelectiveRecordReaderEventIterator(files, readerFactory, eventIds, 65536);
final Optional<ProvenanceEventRecord> firstRecordOption = itr.nextEvent();
assertFalse(firstRecordOption.isPresent());
}
@Test @Test
@Ignore("For local testing only. Runs indefinitely") @Ignore("For local testing only. Runs indefinitely")
public void testPerformanceOfRandomAccessReads() throws Exception { public void testPerformanceOfRandomAccessReads() throws Exception {