NIFI-748 Fixed logic around handling partial query results from provenance repository

- Ensured that failures derived form correlating Document to its actual provenance event do fail the entire query and produce partial results with warning messages
- Refactored DocsReader.read() operation.
- Added test to validate two conditions where the such failures could occur
This commit is contained in:
Oleg Zhurakousky 2015-11-13 14:08:39 -05:00
parent 36d00a60f5
commit a4d93c62c8
6 changed files with 133 additions and 83 deletions

View File

@ -1784,6 +1784,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try { try {
Thread.sleep(100L); Thread.sleep(100L);
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
} }
} }

View File

@ -17,15 +17,14 @@
package org.apache.nifi.provenance.lucene; package org.apache.nifi.provenance.lucene;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory;
public class DocsReader { public class DocsReader {
private final Logger logger = LoggerFactory.getLogger(DocsReader.class); private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
public DocsReader(final List<File> storageDirectories) {
}
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) { if (retrievalCount.get() >= maxResults) {
@ -101,100 +97,60 @@ public class DocsReader {
} }
if (record == null) { if (record == null) {
throw new IOException("Failed to find Provenance Event " + d); logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
} else {
return record;
}
} }
return record;
}
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}
LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null;
String lastStorageFilename = null;
final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
final long start = System.nanoTime(); final long start = System.nanoTime();
int logFileCount = 0;
final Set<String> storageFilesToSkip = new HashSet<>(); Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
int eventsReadThisFile = 0; if (retrievalCount.get() >= maxResults) {
return matchingRecords;
try {
for (final Document d : docs) {
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
if ( storageFilesToSkip.contains(storageFilename) ) {
continue;
} }
try { Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
if (reader != null && storageFilename.equals(lastStorageFilename)) {
matchingRecords.add(getRecord(d, reader)); int eventsReadThisFile = 0;
int logFileCount = 0;
for (String storageFileName : byStorageNameDocGroups.keySet()) {
File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles);
if (provenanceEventFile != null) {
try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
maxAttributeChars)) {
for (Document document : byStorageNameDocGroups.get(storageFileName)) {
ProvenanceEventRecord eRec = this.getRecord(document, reader);
if (eRec != null) {
matchingRecords.add(eRec);
eventsReadThisFile++; eventsReadThisFile++;
if (retrievalCount.incrementAndGet() >= maxResults) { if (retrievalCount.incrementAndGet() >= maxResults) {
break; break;
} }
}
}
} catch (Exception e) {
logger.warn("Failed while trying to read Provenance Events. The event file '"
+ provenanceEventFile.getAbsolutePath() +
"' may be missing or corrupted.", e);
}
} else { } else {
logger.debug("Opening log file {}", storageFilename); logger.warn("Could not find Provenance Log File with "
+ "basename {} in the Provenance Repository; assuming "
logFileCount++; + "file has expired and continuing without it", storageFileName);
if (reader != null) {
reader.close();
}
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
logger.warn("Could not find Provenance Log File with basename {} in the "
+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
storageFilesToSkip.add(storageFilename);
continue;
}
if (potentialFiles.size() > 1) {
throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
storageFilename + " in the Provenance Repository");
}
for (final File file : potentialFiles) {
try {
if (reader != null) {
logger.debug("Read {} records from previous file", eventsReadThisFile);
}
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
matchingRecords.add(getRecord(d, reader));
eventsReadThisFile = 1;
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
} catch (final IOException e) {
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
}
}
}
} finally {
lastStorageFilename = storageFilename;
}
}
} finally {
if (reader != null) {
reader.close();
} }
} }
logger.debug("Read {} records from previous file", eventsReadThisFile); logger.debug("Read {} records from previous file", eventsReadThisFile);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount); logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(),
logFileCount);
return matchingRecords; return matchingRecords;
} }
} }

View File

@ -89,7 +89,7 @@ public class IndexSearch {
return sqr; return sqr;
} }
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); final DocsReader docsReader = new DocsReader();
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
provenanceQuery.getMaxResults(), maxAttributeChars); provenanceQuery.getMaxResults(), maxAttributeChars);

View File

@ -93,7 +93,7 @@ public class LineageQuery {
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS); final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime(); final long searchEnd = System.nanoTime();
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); final DocsReader docsReader = new DocsReader();
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars); new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);

View File

@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.SearchableFields;
@ -160,4 +162,25 @@ public class LuceneUtil {
} }
}); });
} }
/**
* Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
*
* @param documents
* list of {@link Document}s
* @return a {@link Map} of document groups with
* {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
* {@link Document}s as value.
*/
public static Map<String, List<Document>> groupDocsByStorageFileName(final List<Document> documents) {
Map<String, List<Document>> documentGroups = new HashMap<>();
for (Document document : documents) {
String fileName = document.get(FieldNames.STORAGE_FILENAME);
if (!documentGroups.containsKey(fileName)) {
documentGroups.put(fileName, new ArrayList<Document>());
}
documentGroups.get(fileName).add(document);
}
return documentGroups;
}
} }

View File

@ -20,9 +20,11 @@ import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File; import java.io.File;
import java.io.FileFilter; import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer; import org.apache.lucene.analysis.core.SimpleAnalyzer;
@ -48,6 +51,7 @@ import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.lineage.EventNode; import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge; import org.apache.nifi.provenance.lineage.LineageEdge;
@ -869,6 +873,72 @@ public class TestPersistentProvenanceRepository {
} }
} }
/**
* Here the event file is simply corrupted by virtue of not having any event
* records while having correct headers
*/
@Test
public void testWithWithEventFileMissingRecord() throws Exception {
File eventFile = this.prepCorruptedEventFileTests();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
query.setMaxResults(100);
DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
in.writeUTF("BlahBlah");
in.writeInt(4);
in.close();
assertTrue(eventFile.exists());
final QueryResult result = repo.queryEvents(query);
assertEquals(10, result.getMatchingEvents().size());
}
/**
* Here the event file is simply corrupted by virtue of being empty (0
* bytes)
*/
@Test
public void testWithWithEventFileCorrupted() throws Exception {
File eventFile = this.prepCorruptedEventFileTests();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
query.setMaxResults(100);
DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
in.close();
final QueryResult result = repo.queryEvents(query);
assertEquals(10, result.getMatchingEvents().size());
}
private File prepCorruptedEventFileTests() throws Exception {
RepositoryConfiguration config = createConfiguration();
config.setMaxStorageCapacity(1024L * 1024L);
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
config.setMaxEventFileCapacity(1024L * 1024L);
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
config.setDesiredIndexSize(10);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
repo.initialize(getEventReporter());
String uuid = UUID.randomUUID().toString();
for (int i = 0; i < 20; i++) {
ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
.setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
.setFlowFileUUID(uuid).build();
repo.registerEvent(record);
if (i == 9) {
repo.waitForRollover();
Thread.sleep(2000L);
}
}
repo.waitForRollover();
File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
assertTrue(eventFile.delete());
return eventFile;
}
@Test @Test
public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException { public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException {
final RepositoryConfiguration config = createConfiguration(); final RepositoryConfiguration config = createConfiguration();