Merge branch 'NIFI-748' of https://github.com/olegz/nifi into NIFI-748

This commit is contained in:
Mark Payne 2015-11-17 09:23:10 -05:00
commit 453b140d6b
6 changed files with 145 additions and 83 deletions

View File

@ -1784,6 +1784,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}

View File

@ -17,15 +17,15 @@
package org.apache.nifi.provenance.lucene;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -44,12 +44,9 @@ import org.apache.lucene.search.TopDocs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DocsReader {
class DocsReader {
private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
public DocsReader(final List<File> storageDirectories) {
}
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) {
@ -100,101 +97,61 @@ public class DocsReader {
}
}
if ( record == null ) {
throw new IOException("Failed to find Provenance Event " + d);
} else {
return record;
if (record == null) {
logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
}
return record;
}
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}
LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null;
String lastStorageFilename = null;
final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
final long start = System.nanoTime();
Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
int eventsReadThisFile = 0;
int logFileCount = 0;
final Set<String> storageFilesToSkip = new HashSet<>();
int eventsReadThisFile = 0;
for (String storageFileName : byStorageNameDocGroups.keySet()) {
File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles);
if (provenanceEventFile != null) {
try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
maxAttributeChars)) {
try {
for (final Document d : docs) {
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
if ( storageFilesToSkip.contains(storageFilename) ) {
continue;
}
try {
if (reader != null && storageFilename.equals(lastStorageFilename)) {
matchingRecords.add(getRecord(d, reader));
eventsReadThisFile++;
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
} else {
logger.debug("Opening log file {}", storageFilename);
logFileCount++;
if (reader != null) {
reader.close();
}
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
logger.warn("Could not find Provenance Log File with basename {} in the "
+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
storageFilesToSkip.add(storageFilename);
continue;
}
if (potentialFiles.size() > 1) {
throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
storageFilename + " in the Provenance Repository");
}
for (final File file : potentialFiles) {
try {
if (reader != null) {
logger.debug("Read {} records from previous file", eventsReadThisFile);
}
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
matchingRecords.add(getRecord(d, reader));
eventsReadThisFile = 1;
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
} catch (final IOException e) {
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
}
Iterator<Document> docIter = byStorageNameDocGroups.get(storageFileName).iterator();
while (docIter.hasNext() && retrievalCount.incrementAndGet() < maxResults){
ProvenanceEventRecord eRec = this.getRecord(docIter.next(), reader);
if (eRec != null) {
matchingRecords.add(eRec);
eventsReadThisFile++;
}
}
} finally {
lastStorageFilename = storageFilename;
} catch (Exception e) {
logger.warn("Failed while trying to read Provenance Events. The event file '"
+ provenanceEventFile.getAbsolutePath() +
"' may be missing or corrupted.", e);
}
}
} finally {
if (reader != null) {
reader.close();
} else {
logger.warn("Could not find Provenance Log File with "
+ "basename {} in the Provenance Repository; assuming "
+ "file has expired and continuing without it", storageFileName);
}
}
logger.debug("Read {} records from previous file", eventsReadThisFile);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount);
logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(),
logFileCount);
return matchingRecords;
}
}

View File

@ -89,7 +89,7 @@ public class IndexSearch {
return sqr;
}
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
final DocsReader docsReader = new DocsReader();
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
provenanceQuery.getMaxResults(), maxAttributeChars);

View File

@ -93,7 +93,7 @@ public class LineageQuery {
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final DocsReader docsReader = new DocsReader();
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);

View File

@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.SearchableFields;
@ -128,8 +130,14 @@ public class LuceneUtil {
return luceneQuery;
}
/**
* Will sort documents by filename and then file offset so that we can
* retrieve the records efficiently
*
* @param documents
* list of {@link Document}s
*/
public static void sortDocsForRetrieval(final List<Document> documents) {
// sort by filename and then file offset so that we can retrieve the records efficiently
Collections.sort(documents, new Comparator<Document>() {
@Override
public int compare(final Document o1, final Document o2) {
@ -160,4 +168,30 @@ public class LuceneUtil {
}
});
}
/**
* Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
*
* @param documents
* list of {@link Document}s which will be sorted via
* {@link #sortDocsForRetrieval(List)} for more efficient record
* retrieval.
* @return a {@link Map} of document groups with
* {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
* {@link Document}s as value.
*/
public static Map<String, List<Document>> groupDocsByStorageFileName(final List<Document> documents) {
Map<String, List<Document>> documentGroups = new HashMap<>();
for (Document document : documents) {
String fileName = document.get(FieldNames.STORAGE_FILENAME);
if (!documentGroups.containsKey(fileName)) {
documentGroups.put(fileName, new ArrayList<Document>());
}
documentGroups.get(fileName).add(document);
}
for (List<Document> groupedDocuments : documentGroups.values()) {
sortDocsForRetrieval(groupedDocuments);
}
return documentGroups;
}
}

View File

@ -20,9 +20,11 @@ import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
@ -48,6 +51,7 @@ import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
@ -869,6 +873,72 @@ public class TestPersistentProvenanceRepository {
}
}
/**
* Here the event file is simply corrupted by virtue of not having any event
* records while having correct headers
*/
@Test
public void testWithWithEventFileMissingRecord() throws Exception {
File eventFile = this.prepCorruptedEventFileTests();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
query.setMaxResults(100);
DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
in.writeUTF("BlahBlah");
in.writeInt(4);
in.close();
assertTrue(eventFile.exists());
final QueryResult result = repo.queryEvents(query);
assertEquals(10, result.getMatchingEvents().size());
}
/**
* Here the event file is simply corrupted by virtue of being empty (0
* bytes)
*/
@Test
public void testWithWithEventFileCorrupted() throws Exception {
File eventFile = this.prepCorruptedEventFileTests();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
query.setMaxResults(100);
DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
in.close();
final QueryResult result = repo.queryEvents(query);
assertEquals(10, result.getMatchingEvents().size());
}
private File prepCorruptedEventFileTests() throws Exception {
RepositoryConfiguration config = createConfiguration();
config.setMaxStorageCapacity(1024L * 1024L);
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
config.setMaxEventFileCapacity(1024L * 1024L);
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
config.setDesiredIndexSize(10);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
repo.initialize(getEventReporter());
String uuid = UUID.randomUUID().toString();
for (int i = 0; i < 20; i++) {
ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
.setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
.setFlowFileUUID(uuid).build();
repo.registerEvent(record);
if (i == 9) {
repo.waitForRollover();
Thread.sleep(2000L);
}
}
repo.waitForRollover();
File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
assertTrue(eventFile.delete());
return eventFile;
}
@Test
public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException {
final RepositoryConfiguration config = createConfiguration();