mirror of https://github.com/apache/nifi.git
NIFI-748 addressed PR comments
- made DocReader package private - polished logic in read(..) method to avoid escaping the loop - added call to sorting logic in LuceneUtil.groupDocsByStorageFileName(..) to ensure that previous behavior and assumptions in read(..) methodd are preserved - other minor polishing
This commit is contained in:
parent
a4d93c62c8
commit
15880f9fcc
|
@ -22,6 +22,7 @@ import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -43,7 +44,7 @@ import org.apache.lucene.search.TopDocs;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class DocsReader {
|
class DocsReader {
|
||||||
private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
|
private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
|
||||||
|
|
||||||
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
|
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
|
||||||
|
@ -106,12 +107,13 @@ public class DocsReader {
|
||||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
|
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
|
||||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
||||||
|
|
||||||
|
if (retrievalCount.get() >= maxResults) {
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
|
|
||||||
Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
|
Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
|
||||||
if (retrievalCount.get() >= maxResults) {
|
|
||||||
return matchingRecords;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
|
Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
|
||||||
|
|
||||||
|
@ -123,17 +125,16 @@ public class DocsReader {
|
||||||
if (provenanceEventFile != null) {
|
if (provenanceEventFile != null) {
|
||||||
try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
|
try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
|
||||||
maxAttributeChars)) {
|
maxAttributeChars)) {
|
||||||
for (Document document : byStorageNameDocGroups.get(storageFileName)) {
|
|
||||||
ProvenanceEventRecord eRec = this.getRecord(document, reader);
|
Iterator<Document> docIter = byStorageNameDocGroups.get(storageFileName).iterator();
|
||||||
|
while (docIter.hasNext() && retrievalCount.incrementAndGet() < maxResults){
|
||||||
|
ProvenanceEventRecord eRec = this.getRecord(docIter.next(), reader);
|
||||||
if (eRec != null) {
|
if (eRec != null) {
|
||||||
matchingRecords.add(eRec);
|
matchingRecords.add(eRec);
|
||||||
eventsReadThisFile++;
|
eventsReadThisFile++;
|
||||||
|
|
||||||
if (retrievalCount.incrementAndGet() >= maxResults) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("Failed while trying to read Provenance Events. The event file '"
|
logger.warn("Failed while trying to read Provenance Events. The event file '"
|
||||||
+ provenanceEventFile.getAbsolutePath() +
|
+ provenanceEventFile.getAbsolutePath() +
|
||||||
|
|
|
@ -130,8 +130,14 @@ public class LuceneUtil {
|
||||||
return luceneQuery;
|
return luceneQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Will sort documents by filename and then file offset so that we can
|
||||||
|
* retrieve the records efficiently
|
||||||
|
*
|
||||||
|
* @param documents
|
||||||
|
* list of {@link Document}s
|
||||||
|
*/
|
||||||
public static void sortDocsForRetrieval(final List<Document> documents) {
|
public static void sortDocsForRetrieval(final List<Document> documents) {
|
||||||
// sort by filename and then file offset so that we can retrieve the records efficiently
|
|
||||||
Collections.sort(documents, new Comparator<Document>() {
|
Collections.sort(documents, new Comparator<Document>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(final Document o1, final Document o2) {
|
public int compare(final Document o1, final Document o2) {
|
||||||
|
@ -167,7 +173,9 @@ public class LuceneUtil {
|
||||||
* Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
|
* Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
|
||||||
*
|
*
|
||||||
* @param documents
|
* @param documents
|
||||||
* list of {@link Document}s
|
* list of {@link Document}s which will be sorted via
|
||||||
|
* {@link #sortDocsForRetrieval(List)} for more efficient record
|
||||||
|
* retrieval.
|
||||||
* @return a {@link Map} of document groups with
|
* @return a {@link Map} of document groups with
|
||||||
* {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
|
* {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
|
||||||
* {@link Document}s as value.
|
* {@link Document}s as value.
|
||||||
|
@ -181,6 +189,9 @@ public class LuceneUtil {
|
||||||
}
|
}
|
||||||
documentGroups.get(fileName).add(document);
|
documentGroups.get(fileName).add(document);
|
||||||
}
|
}
|
||||||
|
for (List<Document> groupedDocuments : documentGroups.values()) {
|
||||||
|
sortDocsForRetrieval(groupedDocuments);
|
||||||
|
}
|
||||||
return documentGroups;
|
return documentGroups;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue