mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-1082'
This commit is contained in:
commit
814e8b212c
|
@ -71,7 +71,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
private final Map<String, String> previousAttributes;
|
private final Map<String, String> previousAttributes;
|
||||||
private final Map<String, String> updatedAttributes;
|
private final Map<String, String> updatedAttributes;
|
||||||
|
|
||||||
private volatile long eventId;
|
private volatile long eventId = -1L;
|
||||||
|
|
||||||
private StandardProvenanceEventRecord(final Builder builder) {
|
private StandardProvenanceEventRecord(final Builder builder) {
|
||||||
this.eventTime = builder.eventTime;
|
this.eventTime = builder.eventTime;
|
||||||
|
@ -105,8 +105,8 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
contentClaimOffset = builder.contentClaimOffset;
|
contentClaimOffset = builder.contentClaimOffset;
|
||||||
contentSize = builder.contentSize;
|
contentSize = builder.contentSize;
|
||||||
|
|
||||||
previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
|
previousAttributes = builder.previousAttributes == null ? Collections.<String, String> emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
|
||||||
updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
|
updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String> emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
|
||||||
|
|
||||||
sourceQueueIdentifier = builder.sourceQueueIdentifier;
|
sourceQueueIdentifier = builder.sourceQueueIdentifier;
|
||||||
|
|
||||||
|
@ -198,12 +198,12 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> getParentUuids() {
|
public List<String> getParentUuids() {
|
||||||
return parentUuids == null ? Collections.<String>emptyList() : parentUuids;
|
return parentUuids == null ? Collections.<String> emptyList() : parentUuids;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> getChildUuids() {
|
public List<String> getChildUuids() {
|
||||||
return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids;
|
return childrenUuids == null ? Collections.<String> emptyList() : childrenUuids;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -298,7 +298,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
}
|
}
|
||||||
|
|
||||||
return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
|
return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
|
||||||
+ (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
|
+ (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -316,7 +316,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
|
final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
|
||||||
// If event ID's are populated and not equal, return false. If they have not yet been populated, do not
|
// If event ID's are populated and not equal, return false. If they have not yet been populated, do not
|
||||||
// use them in the comparison.
|
// use them in the comparison.
|
||||||
if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
|
if (eventId >= 0L && other.getEventId() >= 0L && eventId != other.getEventId()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (eventType != other.eventType) {
|
if (eventType != other.eventType) {
|
||||||
|
@ -397,16 +397,16 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ProvenanceEventRecord ["
|
return "ProvenanceEventRecord ["
|
||||||
+ "eventId=" + eventId
|
+ "eventId=" + eventId
|
||||||
+ ", eventType=" + eventType
|
+ ", eventType=" + eventType
|
||||||
+ ", eventTime=" + new Date(eventTime)
|
+ ", eventTime=" + new Date(eventTime)
|
||||||
+ ", uuid=" + uuid
|
+ ", uuid=" + uuid
|
||||||
+ ", fileSize=" + contentSize
|
+ ", fileSize=" + contentSize
|
||||||
+ ", componentId=" + componentId
|
+ ", componentId=" + componentId
|
||||||
+ ", transitUri=" + transitUri
|
+ ", transitUri=" + transitUri
|
||||||
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
|
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
|
||||||
+ ", parentUuids=" + parentUuids
|
+ ", parentUuids=" + parentUuids
|
||||||
+ ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
|
+ ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Builder implements ProvenanceEventBuilder {
|
public static class Builder implements ProvenanceEventBuilder {
|
||||||
|
@ -663,7 +663,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
|
||||||
setFlowFileEntryDate(flowFile.getEntryDate());
|
setFlowFileEntryDate(flowFile.getEntryDate());
|
||||||
setLineageIdentifiers(flowFile.getLineageIdentifiers());
|
setLineageIdentifiers(flowFile.getLineageIdentifiers());
|
||||||
setLineageStartDate(flowFile.getLineageStartDate());
|
setLineageStartDate(flowFile.getLineageStartDate());
|
||||||
setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
|
setAttributes(Collections.<String, String> emptyMap(), flowFile.getAttributes());
|
||||||
uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
|
uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
|
||||||
this.contentSize = flowFile.getSize();
|
this.contentSize = flowFile.getSize();
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.provenance;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -40,7 +41,7 @@ public class StandardQueryResult implements QueryResult {
|
||||||
|
|
||||||
private final Lock writeLock = rwLock.writeLock();
|
private final Lock writeLock = rwLock.writeLock();
|
||||||
// guarded by writeLock
|
// guarded by writeLock
|
||||||
private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
|
private final List<List<ProvenanceEventRecord>> matchingRecords;
|
||||||
private long totalHitCount;
|
private long totalHitCount;
|
||||||
private int numCompletedSteps = 0;
|
private int numCompletedSteps = 0;
|
||||||
private Date expirationDate;
|
private Date expirationDate;
|
||||||
|
@ -53,6 +54,11 @@ public class StandardQueryResult implements QueryResult {
|
||||||
this.query = query;
|
this.query = query;
|
||||||
this.numSteps = numSteps;
|
this.numSteps = numSteps;
|
||||||
this.creationNanos = System.nanoTime();
|
this.creationNanos = System.nanoTime();
|
||||||
|
this.matchingRecords = new ArrayList<>(numSteps);
|
||||||
|
|
||||||
|
for (int i = 0; i < Math.max(1, numSteps); i++) {
|
||||||
|
matchingRecords.add(Collections.<ProvenanceEventRecord> emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
updateExpiration();
|
updateExpiration();
|
||||||
}
|
}
|
||||||
|
@ -61,13 +67,14 @@ public class StandardQueryResult implements QueryResult {
|
||||||
public List<ProvenanceEventRecord> getMatchingEvents() {
|
public List<ProvenanceEventRecord> getMatchingEvents() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (matchingRecords.size() <= query.getMaxResults()) {
|
|
||||||
return new ArrayList<>(matchingRecords);
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
||||||
for (int i = 0; i < query.getMaxResults(); i++) {
|
for (final List<ProvenanceEventRecord> recordList : matchingRecords) {
|
||||||
copy.add(matchingRecords.get(i));
|
if (copy.size() + recordList.size() > query.getMaxResults()) {
|
||||||
|
copy.addAll(recordList.subList(0, query.getMaxResults() - copy.size()));
|
||||||
|
return copy;
|
||||||
|
} else {
|
||||||
|
copy.addAll(recordList);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return copy;
|
return copy;
|
||||||
|
@ -141,10 +148,10 @@ public class StandardQueryResult implements QueryResult {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) {
|
public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits, final int indexId) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
this.matchingRecords.addAll(matchingRecords);
|
this.matchingRecords.set(indexId, new ArrayList<>(matchingRecords));
|
||||||
this.totalHitCount += totalHits;
|
this.totalHitCount += totalHits;
|
||||||
|
|
||||||
numCompletedSteps++;
|
numCompletedSteps++;
|
||||||
|
|
|
@ -36,7 +36,6 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.nifi.provenance.serialization.RecordReader;
|
import org.apache.nifi.provenance.serialization.RecordReader;
|
||||||
import org.apache.nifi.provenance.serialization.RecordReaders;
|
import org.apache.nifi.provenance.serialization.RecordReaders;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -173,6 +172,9 @@ public class IndexConfiguration {
|
||||||
for (final List<File> list : indexDirectoryMap.values()) {
|
for (final List<File> list : indexDirectoryMap.values()) {
|
||||||
files.addAll(list);
|
files.addAll(list);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Collections.sort(files, new IndexDirectoryComparator());
|
||||||
|
|
||||||
return files;
|
return files;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
@ -198,11 +200,11 @@ public class IndexConfiguration {
|
||||||
* span (times inclusive).
|
* span (times inclusive).
|
||||||
*
|
*
|
||||||
* @param startTime the start time of the query for which the indices are
|
* @param startTime the start time of the query for which the indices are
|
||||||
* desired
|
* desired
|
||||||
* @param endTime the end time of the query for which the indices are
|
* @param endTime the end time of the query for which the indices are
|
||||||
* desired
|
* desired
|
||||||
* @return the index directories that are applicable only for the given time
|
* @return the index directories that are applicable only for the given time
|
||||||
* span (times inclusive).
|
* span (times inclusive).
|
||||||
*/
|
*/
|
||||||
public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
|
public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
|
||||||
if (startTime == null && endTime == null) {
|
if (startTime == null && endTime == null) {
|
||||||
|
@ -213,14 +215,7 @@ public class IndexConfiguration {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
final List<File> sortedIndexDirectories = getIndexDirectories();
|
final List<File> sortedIndexDirectories = getIndexDirectories();
|
||||||
Collections.sort(sortedIndexDirectories, new Comparator<File>() {
|
Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator());
|
||||||
@Override
|
|
||||||
public int compare(final File o1, final File o2) {
|
|
||||||
final long epochTimestamp1 = getIndexStartTime(o1);
|
|
||||||
final long epochTimestamp2 = getIndexStartTime(o2);
|
|
||||||
return Long.compare(epochTimestamp1, epochTimestamp2);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (final File indexDir : sortedIndexDirectories) {
|
for (final File indexDir : sortedIndexDirectories) {
|
||||||
// If the index was last modified before the start time, we know that it doesn't
|
// If the index was last modified before the start time, we know that it doesn't
|
||||||
|
@ -252,9 +247,9 @@ public class IndexConfiguration {
|
||||||
* event log
|
* event log
|
||||||
*
|
*
|
||||||
* @param provenanceLogFile the provenance log file for which the index
|
* @param provenanceLogFile the provenance log file for which the index
|
||||||
* directories are desired
|
* directories are desired
|
||||||
* @return the index directories that are applicable only for the given
|
* @return the index directories that are applicable only for the given
|
||||||
* event log
|
* event log
|
||||||
*/
|
*/
|
||||||
public List<File> getIndexDirectories(final File provenanceLogFile) {
|
public List<File> getIndexDirectories(final File provenanceLogFile) {
|
||||||
final List<File> dirs = new ArrayList<>();
|
final List<File> dirs = new ArrayList<>();
|
||||||
|
@ -262,23 +257,16 @@ public class IndexConfiguration {
|
||||||
try {
|
try {
|
||||||
final List<File> indices = indexDirectoryMap.get(provenanceLogFile.getParentFile());
|
final List<File> indices = indexDirectoryMap.get(provenanceLogFile.getParentFile());
|
||||||
if (indices == null) {
|
if (indices == null) {
|
||||||
return Collections.<File>emptyList();
|
return Collections.<File> emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<File> sortedIndexDirectories = new ArrayList<>(indices);
|
final List<File> sortedIndexDirectories = new ArrayList<>(indices);
|
||||||
Collections.sort(sortedIndexDirectories, new Comparator<File>() {
|
Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator());
|
||||||
@Override
|
|
||||||
public int compare(final File o1, final File o2) {
|
|
||||||
final long epochTimestamp1 = getIndexStartTime(o1);
|
|
||||||
final long epochTimestamp2 = getIndexStartTime(o2);
|
|
||||||
return Long.compare(epochTimestamp1, epochTimestamp2);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final Long firstEntryTime = getFirstEntryTime(provenanceLogFile);
|
final Long firstEntryTime = getFirstEntryTime(provenanceLogFile);
|
||||||
if (firstEntryTime == null) {
|
if (firstEntryTime == null) {
|
||||||
logger.debug("Found no records in {} so returning no Indices for it", provenanceLogFile);
|
logger.debug("Found no records in {} so returning no Indices for it", provenanceLogFile);
|
||||||
return Collections.<File>emptyList();
|
return Collections.<File> emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean foundIndexCreatedLater = false;
|
boolean foundIndexCreatedLater = false;
|
||||||
|
@ -376,7 +364,7 @@ public class IndexConfiguration {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
if (minIndexedId == null || id > minIndexedId) {
|
if (minIndexedId == null || id > minIndexedId) {
|
||||||
if (maxIndexedId == null || id > maxIndexedId) { // id will be > maxIndexedId if all records were expired
|
if (maxIndexedId == null || id > maxIndexedId) { // id will be > maxIndexedId if all records were expired
|
||||||
minIndexedId = maxIndexedId;
|
minIndexedId = maxIndexedId;
|
||||||
} else {
|
} else {
|
||||||
minIndexedId = id;
|
minIndexedId = id;
|
||||||
|
@ -395,4 +383,13 @@ public class IndexConfiguration {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class IndexDirectoryComparator implements Comparator<File> {
|
||||||
|
@Override
|
||||||
|
public int compare(final File o1, final File o2) {
|
||||||
|
final long epochTimestamp1 = getIndexStartTime(o1);
|
||||||
|
final long epochTimestamp2 = getIndexStartTime(o2);
|
||||||
|
return -Long.compare(epochTimestamp1, epochTimestamp2);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
public static final String EVENT_CATEGORY = "Provenance Repository";
|
public static final String EVENT_CATEGORY = "Provenance Repository";
|
||||||
private static final String FILE_EXTENSION = ".prov";
|
private static final String FILE_EXTENSION = ".prov";
|
||||||
private static final String TEMP_FILE_SUFFIX = ".prov.part";
|
private static final String TEMP_FILE_SUFFIX = ".prov.part";
|
||||||
private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
|
private static final long PURGE_EVENT_MILLISECONDS = 2500L; // Determines the frequency over which the task to delete old events will occur
|
||||||
public static final int SERIALIZATION_VERSION = 8;
|
public static final int SERIALIZATION_VERSION = 8;
|
||||||
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
|
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
|
||||||
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
|
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
|
||||||
|
@ -404,9 +404,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
// if this is the first record, try to find out the block index and jump directly to
|
// if this is the first record, try to find out the block index and jump directly to
|
||||||
// the block index. This avoids having to read through a lot of data that we don't care about
|
// the block index. This avoids having to read through a lot of data that we don't care about
|
||||||
// just to get to the first record that we want.
|
// just to get to the first record that we want.
|
||||||
if ( records.isEmpty() ) {
|
if (records.isEmpty()) {
|
||||||
final TocReader tocReader = reader.getTocReader();
|
final TocReader tocReader = reader.getTocReader();
|
||||||
if ( tocReader != null ) {
|
if (tocReader != null) {
|
||||||
final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId);
|
final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId);
|
||||||
if (blockIndex != null) {
|
if (blockIndex != null) {
|
||||||
reader.skipToBlock(blockIndex);
|
reader.skipToBlock(blockIndex);
|
||||||
|
@ -641,7 +641,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
|
|
||||||
indexManager.close();
|
indexManager.close();
|
||||||
|
|
||||||
if ( writers != null ) {
|
if (writers != null) {
|
||||||
for (final RecordWriter writer : writers) {
|
for (final RecordWriter writer : writers) {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
@ -700,7 +700,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
// journal will result in corruption!
|
// journal will result in corruption!
|
||||||
writer.markDirty();
|
writer.markDirty();
|
||||||
dirtyWriterCount.incrementAndGet();
|
dirtyWriterCount.incrementAndGet();
|
||||||
streamStartTime.set(0L); // force rollover to happen soon.
|
streamStartTime.set(0L); // force rollover to happen soon.
|
||||||
throw t;
|
throw t;
|
||||||
} finally {
|
} finally {
|
||||||
writer.unlock();
|
writer.unlock();
|
||||||
|
@ -912,15 +912,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
removed.add(baseName);
|
removed.add(baseName);
|
||||||
} catch (final FileNotFoundException fnf) {
|
} catch (final FileNotFoundException fnf) {
|
||||||
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
|
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
|
||||||
+ "perform additional Expiration Actions on this file", currentAction, file);
|
+ "perform additional Expiration Actions on this file", currentAction, file);
|
||||||
removed.add(baseName);
|
removed.add(baseName);
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
|
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
|
||||||
+ "Expiration Actions on this file at this time", currentAction, file, t.toString());
|
+ "Expiration Actions on this file at this time", currentAction, file, t.toString());
|
||||||
logger.warn("", t);
|
logger.warn("", t);
|
||||||
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
|
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
|
||||||
" on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
|
" on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
|
||||||
"on this file at this time");
|
"on this file at this time");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1131,10 +1131,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
protected int getJournalCount() {
|
protected int getJournalCount() {
|
||||||
// determine how many 'journals' we have in the journals directories
|
// determine how many 'journals' we have in the journals directories
|
||||||
int journalFileCount = 0;
|
int journalFileCount = 0;
|
||||||
for ( final File storageDir : configuration.getStorageDirectories() ) {
|
for (final File storageDir : configuration.getStorageDirectories()) {
|
||||||
final File journalsDir = new File(storageDir, "journals");
|
final File journalsDir = new File(storageDir, "journals");
|
||||||
final File[] journalFiles = journalsDir.listFiles();
|
final File[] journalFiles = journalsDir.listFiles();
|
||||||
if ( journalFiles != null ) {
|
if (journalFiles != null) {
|
||||||
journalFileCount += journalFiles.length;
|
journalFileCount += journalFiles.length;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1169,12 +1169,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
writer.close();
|
writer.close();
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
logger.warn("Failed to close {} due to {}", writer, ioe.toString());
|
logger.warn("Failed to close {} due to {}", writer, ioe.toString());
|
||||||
if ( logger.isDebugEnabled() ) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.warn("", ioe);
|
logger.warn("", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ( logger.isDebugEnabled() ) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
|
logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1263,10 +1263,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
// that is no longer the case.
|
// that is no longer the case.
|
||||||
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
|
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
|
||||||
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
|
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
|
||||||
+ "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
|
+ "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
|
||||||
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
|
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
|
||||||
eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
|
eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
|
||||||
+ "exceeding the provenance recording rate. Slowing down flow to accommodate");
|
+ "exceeding the provenance recording rate. Slowing down flow to accommodate");
|
||||||
|
|
||||||
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
|
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
|
||||||
// if a shutdown happens while we are in this loop, kill the rollover thread and break
|
// if a shutdown happens while we are in this loop, kill the rollover thread and break
|
||||||
|
@ -1293,15 +1293,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
|
logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
|
||||||
+ "to accommodate. Currently, there are {} journal files ({} bytes) and "
|
+ "to accommodate. Currently, there are {} journal files ({} bytes) and "
|
||||||
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
|
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
|
||||||
|
|
||||||
journalFileCount = getJournalCount();
|
journalFileCount = getJournalCount();
|
||||||
repoSize = getSize(getLogFiles(), 0L);
|
repoSize = getSize(getLogFiles(), 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
|
logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
|
||||||
+ "journal files to be rolled over is {}", journalFileCount);
|
+ "journal files to be rolled over is {}", journalFileCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
// we've finished rolling over successfully. Create new writers and reset state.
|
// we've finished rolling over successfully. Create new writers and reset state.
|
||||||
|
@ -1335,7 +1335,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final File journalFile : journalFiles) {
|
for (final File journalFile : journalFiles) {
|
||||||
if ( journalFile.isDirectory() ) {
|
if (journalFile.isDirectory()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1403,7 +1403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
*/
|
*/
|
||||||
File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException {
|
File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException {
|
||||||
logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
|
logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
|
||||||
if ( this.closed.get() ) {
|
if (this.closed.get()) {
|
||||||
logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile);
|
logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1439,14 +1439,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
// we have all "partial" files and there is already a merged file. Delete the data from the index
|
// we have all "partial" files and there is already a merged file. Delete the data from the index
|
||||||
// because the merge file may not be fully merged. We will re-merge.
|
// because the merge file may not be fully merged. We will re-merge.
|
||||||
logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
|
logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
|
||||||
+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
|
+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
|
||||||
|
|
||||||
final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
|
final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
|
||||||
try {
|
try {
|
||||||
deleteAction.execute(suggestedMergeFile);
|
deleteAction.execute(suggestedMergeFile);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString());
|
logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString());
|
||||||
if ( logger.isDebugEnabled() ) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.warn("", e);
|
logger.warn("", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1460,18 +1460,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
final File tocFile = TocUtil.getTocFile(suggestedMergeFile);
|
final File tocFile = TocUtil.getTocFile(suggestedMergeFile);
|
||||||
if ( tocFile.exists() && !tocFile.delete() ) {
|
if (tocFile.exists() && !tocFile.delete()) {
|
||||||
logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
|
logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
|
||||||
+ "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile);
|
+ "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
|
logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
|
||||||
+ "but it did not; assuming that the files were already merged but only some finished deletion "
|
+ "but it did not; assuming that the files were already merged but only some finished deletion "
|
||||||
+ "before restart. Deleting remaining partial journal files.", journalFiles);
|
+ "before restart. Deleting remaining partial journal files.", journalFiles);
|
||||||
|
|
||||||
for ( final File file : journalFiles ) {
|
for (final File file : journalFiles) {
|
||||||
if ( !file.delete() && file.exists() ) {
|
if (!file.delete() && file.exists()) {
|
||||||
logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
|
logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1529,7 +1529,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
} catch (final EOFException eof) {
|
} catch (final EOFException eof) {
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
|
logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
|
||||||
+ "completely written to the file. This record will be skipped.");
|
+ "completely written to the file. This record will be skipped.");
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.warn("", e);
|
logger.warn("", e);
|
||||||
}
|
}
|
||||||
|
@ -1544,11 +1544,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( record.getEventTime() < earliestTimestamp ) {
|
if (record.getEventTime() < earliestTimestamp) {
|
||||||
earliestTimestamp = record.getEventTime();
|
earliestTimestamp = record.getEventTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( record.getEventId() < minEventId ) {
|
if (record.getEventId() < minEventId) {
|
||||||
minEventId = record.getEventId();
|
minEventId = record.getEventId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1799,7 +1799,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
final int numQueries = querySubmissionMap.size();
|
final int numQueries = querySubmissionMap.size();
|
||||||
if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
|
if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
|
||||||
throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
|
throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
|
||||||
+ "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
|
+ "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
|
if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
|
||||||
|
@ -1820,7 +1820,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
|
|
||||||
Long maxEventId = getMaxEventId();
|
Long maxEventId = getMaxEventId();
|
||||||
if (maxEventId == null) {
|
if (maxEventId == null) {
|
||||||
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
|
result.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
|
||||||
maxEventId = 0L;
|
maxEventId = 0L;
|
||||||
}
|
}
|
||||||
Long minIndexedId = indexConfig.getMinIdIndexed();
|
Long minIndexedId = indexConfig.getMinIdIndexed();
|
||||||
|
@ -1830,7 +1830,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
|
|
||||||
final long totalNumDocs = maxEventId - minIndexedId;
|
final long totalNumDocs = maxEventId - minIndexedId;
|
||||||
|
|
||||||
result.getResult().update(trimmed, totalNumDocs);
|
result.getResult().update(trimmed, totalNumDocs, 0);
|
||||||
} else {
|
} else {
|
||||||
queryExecService.submit(new GetMostRecentRunnable(query, result));
|
queryExecService.submit(new GetMostRecentRunnable(query, result));
|
||||||
}
|
}
|
||||||
|
@ -1839,18 +1839,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
final AtomicInteger retrievalCount = new AtomicInteger(0);
|
|
||||||
final List<File> indexDirectories = indexConfig.getIndexDirectories(
|
final List<File> indexDirectories = indexConfig.getIndexDirectories(
|
||||||
query.getStartDate() == null ? null : query.getStartDate().getTime(),
|
query.getStartDate() == null ? null : query.getStartDate().getTime(),
|
||||||
query.getEndDate() == null ? null : query.getEndDate().getTime());
|
query.getEndDate() == null ? null : query.getEndDate().getTime());
|
||||||
final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size());
|
final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size());
|
||||||
querySubmissionMap.put(query.getIdentifier(), result);
|
querySubmissionMap.put(query.getIdentifier(), result);
|
||||||
|
|
||||||
if (indexDirectories.isEmpty()) {
|
if (indexDirectories.isEmpty()) {
|
||||||
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
|
result.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
|
||||||
} else {
|
} else {
|
||||||
|
int indexId = 0;
|
||||||
for (final File indexDir : indexDirectories) {
|
for (final File indexDir : indexDirectories) {
|
||||||
queryExecService.submit(new QueryRunnable(query, result, indexDir, retrievalCount));
|
queryExecService.submit(new QueryRunnable(query, result, indexDir, indexId++));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2024,11 +2024,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
Lineage computeLineage(final String flowFileUuid) throws IOException {
|
Lineage computeLineage(final String flowFileUuid) throws IOException {
|
||||||
return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
|
return computeLineage(Collections.<String> singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
|
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
|
||||||
final Long endTimestamp) throws IOException {
|
final Long endTimestamp) throws IOException {
|
||||||
final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
|
final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
|
||||||
final StandardLineageResult result = submission.getResult();
|
final StandardLineageResult result = submission.getResult();
|
||||||
while (!result.isFinished()) {
|
while (!result.isFinished()) {
|
||||||
|
@ -2051,7 +2051,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
|
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
|
||||||
final Long eventId, final long startTimestamp, final long endTimestamp) {
|
final Long eventId, final long startTimestamp, final long endTimestamp) {
|
||||||
final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
|
final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
|
||||||
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
|
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
|
||||||
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
|
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
|
||||||
|
@ -2068,9 +2068,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
try {
|
try {
|
||||||
final ProvenanceEventRecord event = getEvent(eventId);
|
final ProvenanceEventRecord event = getEvent(eventId);
|
||||||
if (event == null) {
|
if (event == null) {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
|
submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
|
||||||
return submission;
|
return submission;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2081,13 +2081,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
case REPLAY:
|
case REPLAY:
|
||||||
return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
|
return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
|
||||||
default:
|
default:
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
|
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
|
||||||
return submission;
|
return submission;
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
|
|
||||||
if (ioe.getMessage() == null) {
|
if (ioe.getMessage() == null) {
|
||||||
|
@ -2105,9 +2105,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
try {
|
try {
|
||||||
final ProvenanceEventRecord event = getEvent(eventId);
|
final ProvenanceEventRecord event = getEvent(eventId);
|
||||||
if (event == null) {
|
if (event == null) {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
|
submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
|
||||||
return submission;
|
return submission;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2118,14 +2118,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
case REPLAY:
|
case REPLAY:
|
||||||
return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime());
|
return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime());
|
||||||
default: {
|
default: {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
|
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
|
||||||
return submission;
|
return submission;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
|
|
||||||
if (ioe.getMessage() == null) {
|
if (ioe.getMessage() == null) {
|
||||||
|
@ -2248,7 +2248,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
// get the max indexed event id
|
// get the max indexed event id
|
||||||
final Long maxEventId = indexConfig.getMaxIdIndexed();
|
final Long maxEventId = indexConfig.getMaxIdIndexed();
|
||||||
if (maxEventId == null) {
|
if (maxEventId == null) {
|
||||||
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0);
|
submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2263,7 +2263,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
final long totalNumDocs = maxEventId - minIndexedId;
|
final long totalNumDocs = maxEventId - minIndexedId;
|
||||||
|
|
||||||
final List<ProvenanceEventRecord> mostRecent = getEvents(startIndex, maxResults);
|
final List<ProvenanceEventRecord> mostRecent = getEvents(startIndex, maxResults);
|
||||||
submission.getResult().update(mostRecent, totalNumDocs);
|
// reverse the order so that the newest events come first.
|
||||||
|
Collections.reverse(mostRecent);
|
||||||
|
submission.getResult().update(mostRecent, totalNumDocs, 0);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
logger.error("Failed to retrieve records from Provenance Repository: " + ioe.toString());
|
logger.error("Failed to retrieve records from Provenance Repository: " + ioe.toString());
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
|
@ -2284,24 +2286,28 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
private final Query query;
|
private final Query query;
|
||||||
private final AsyncQuerySubmission submission;
|
private final AsyncQuerySubmission submission;
|
||||||
private final File indexDir;
|
private final File indexDir;
|
||||||
private final AtomicInteger retrievalCount;
|
private final int indexId;
|
||||||
|
|
||||||
public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final AtomicInteger retrievalCount) {
|
public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final int indexId) {
|
||||||
this.query = query;
|
this.query = query;
|
||||||
this.submission = submission;
|
this.submission = submission;
|
||||||
this.indexDir = indexDir;
|
this.indexDir = indexDir;
|
||||||
this.retrievalCount = retrievalCount;
|
this.indexId = indexId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
|
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
|
||||||
final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp);
|
final StandardQueryResult queryResult = search.search(query, firstEventTimestamp);
|
||||||
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
|
|
||||||
|
logger.debug("Merging query results for indexId {}; before merge, num events = {}", indexId, queryResult.getTotalHitCount());
|
||||||
|
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount(), indexId);
|
||||||
|
logger.debug("Merging query results for indexId {}; after merge, num events = {}", indexId, queryResult.getTotalHitCount());
|
||||||
|
|
||||||
if (queryResult.isFinished()) {
|
if (queryResult.isFinished()) {
|
||||||
logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}",
|
logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}",
|
||||||
query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
|
query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
|
||||||
}
|
}
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString());
|
logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString());
|
||||||
|
@ -2344,7 +2350,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
||||||
result.update(matchingRecords);
|
result.update(matchingRecords);
|
||||||
|
|
||||||
logger.info("Successfully created Lineage for FlowFiles with UUIDs {} in {} milliseconds; Lineage contains {} nodes and {} edges",
|
logger.info("Successfully created Lineage for FlowFiles with UUIDs {} in {} milliseconds; Lineage contains {} nodes and {} edges",
|
||||||
flowFileUuids, result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), result.getEdges().size());
|
flowFileUuids, result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), result.getEdges().size());
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
logger.error("Failed to query provenance repository due to {}", t.toString());
|
logger.error("Failed to query provenance repository due to {}", t.toString());
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
|
|
|
@ -22,25 +22,23 @@ import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.index.IndexReader;
|
||||||
|
import org.apache.lucene.index.IndexableField;
|
||||||
|
import org.apache.lucene.search.ScoreDoc;
|
||||||
|
import org.apache.lucene.search.TopDocs;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
import org.apache.nifi.provenance.SearchableFields;
|
import org.apache.nifi.provenance.SearchableFields;
|
||||||
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
|
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
|
||||||
import org.apache.nifi.provenance.serialization.RecordReader;
|
import org.apache.nifi.provenance.serialization.RecordReader;
|
||||||
import org.apache.nifi.provenance.serialization.RecordReaders;
|
import org.apache.nifi.provenance.serialization.RecordReaders;
|
||||||
import org.apache.nifi.provenance.toc.TocReader;
|
import org.apache.nifi.provenance.toc.TocReader;
|
||||||
import org.apache.lucene.document.Document;
|
|
||||||
import org.apache.lucene.index.IndexReader;
|
|
||||||
import org.apache.lucene.index.IndexableField;
|
|
||||||
import org.apache.lucene.search.ScoreDoc;
|
|
||||||
import org.apache.lucene.search.TopDocs;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -51,11 +49,7 @@ public class DocsReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
|
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
|
||||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
final int maxResults, final int maxAttributeChars) throws IOException {
|
||||||
if (retrievalCount.get() >= maxResults) {
|
|
||||||
return Collections.emptySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
|
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
|
||||||
final List<Document> docs = new ArrayList<>(numDocs);
|
final List<Document> docs = new ArrayList<>(numDocs);
|
||||||
|
@ -68,13 +62,13 @@ public class DocsReader {
|
||||||
|
|
||||||
final long readDocuments = System.nanoTime() - start;
|
final long readDocuments = System.nanoTime() - start;
|
||||||
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
|
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
|
||||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars);
|
return read(docs, allProvenanceLogFiles, maxResults, maxAttributeChars);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private long getByteOffset(final Document d, final RecordReader reader) {
|
private long getByteOffset(final Document d, final RecordReader reader) {
|
||||||
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
|
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
|
||||||
if ( blockField != null ) {
|
if (blockField != null) {
|
||||||
final int blockIndex = blockField.numericValue().intValue();
|
final int blockIndex = blockField.numericValue().intValue();
|
||||||
final TocReader tocReader = reader.getTocReader();
|
final TocReader tocReader = reader.getTocReader();
|
||||||
return tocReader.getBlockOffset(blockIndex);
|
return tocReader.getBlockOffset(blockIndex);
|
||||||
|
@ -86,21 +80,21 @@ public class DocsReader {
|
||||||
|
|
||||||
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
|
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
|
||||||
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
|
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
|
||||||
if ( blockField == null ) {
|
if (blockField == null) {
|
||||||
reader.skipTo(getByteOffset(d, reader));
|
reader.skipTo(getByteOffset(d, reader));
|
||||||
} else {
|
} else {
|
||||||
reader.skipToBlock(blockField.numericValue().intValue());
|
reader.skipToBlock(blockField.numericValue().intValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
StandardProvenanceEventRecord record;
|
StandardProvenanceEventRecord record;
|
||||||
while ( (record = reader.nextRecord()) != null) {
|
while ((record = reader.nextRecord()) != null) {
|
||||||
final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
|
final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
|
||||||
if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
|
if (idField == null || idField.numericValue().longValue() == record.getEventId()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( record == null ) {
|
if (record == null) {
|
||||||
throw new IOException("Failed to find Provenance Event " + d);
|
throw new IOException("Failed to find Provenance Event " + d);
|
||||||
} else {
|
} else {
|
||||||
return record;
|
return record;
|
||||||
|
@ -109,10 +103,7 @@ public class DocsReader {
|
||||||
|
|
||||||
|
|
||||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
|
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
|
||||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
final int maxResults, final int maxAttributeChars) throws IOException {
|
||||||
if (retrievalCount.get() >= maxResults) {
|
|
||||||
return Collections.emptySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
LuceneUtil.sortDocsForRetrieval(docs);
|
LuceneUtil.sortDocsForRetrieval(docs);
|
||||||
|
|
||||||
|
@ -129,7 +120,7 @@ public class DocsReader {
|
||||||
try {
|
try {
|
||||||
for (final Document d : docs) {
|
for (final Document d : docs) {
|
||||||
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
|
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
|
||||||
if ( storageFilesToSkip.contains(storageFilename) ) {
|
if (storageFilesToSkip.contains(storageFilename)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,10 +128,6 @@ public class DocsReader {
|
||||||
if (reader != null && storageFilename.equals(lastStorageFilename)) {
|
if (reader != null && storageFilename.equals(lastStorageFilename)) {
|
||||||
matchingRecords.add(getRecord(d, reader));
|
matchingRecords.add(getRecord(d, reader));
|
||||||
eventsReadThisFile++;
|
eventsReadThisFile++;
|
||||||
|
|
||||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logger.debug("Opening log file {}", storageFilename);
|
logger.debug("Opening log file {}", storageFilename);
|
||||||
|
|
||||||
|
@ -152,14 +139,14 @@ public class DocsReader {
|
||||||
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
||||||
if (potentialFiles.isEmpty()) {
|
if (potentialFiles.isEmpty()) {
|
||||||
logger.warn("Could not find Provenance Log File with basename {} in the "
|
logger.warn("Could not find Provenance Log File with basename {} in the "
|
||||||
+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
|
+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
|
||||||
storageFilesToSkip.add(storageFilename);
|
storageFilesToSkip.add(storageFilename);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (potentialFiles.size() > 1) {
|
if (potentialFiles.size() > 1) {
|
||||||
throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
|
throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
|
||||||
storageFilename + " in the Provenance Repository");
|
storageFilename + " in the Provenance Repository");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final File file : potentialFiles) {
|
for (final File file : potentialFiles) {
|
||||||
|
@ -171,10 +158,6 @@ public class DocsReader {
|
||||||
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
|
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
|
||||||
matchingRecords.add(getRecord(d, reader));
|
matchingRecords.add(getRecord(d, reader));
|
||||||
eventsReadThisFile = 1;
|
eventsReadThisFile = 1;
|
||||||
|
|
||||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
|
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,22 @@ package org.apache.nifi.provenance.lucene;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.search.Sort;
|
||||||
|
import org.apache.lucene.search.SortField;
|
||||||
|
import org.apache.lucene.search.SortField.Type;
|
||||||
import org.apache.lucene.search.TopDocs;
|
import org.apache.lucene.search.TopDocs;
|
||||||
import org.apache.nifi.provenance.PersistentProvenanceRepository;
|
import org.apache.nifi.provenance.PersistentProvenanceRepository;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
|
import org.apache.nifi.provenance.SearchableFields;
|
||||||
import org.apache.nifi.provenance.StandardQueryResult;
|
import org.apache.nifi.provenance.StandardQueryResult;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -48,7 +53,7 @@ public class IndexSearch {
|
||||||
this.maxAttributeChars = maxAttributeChars;
|
this.maxAttributeChars = maxAttributeChars;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount, final long firstEventTimestamp) throws IOException {
|
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final long firstEventTimestamp) throws IOException {
|
||||||
if (!indexDirectory.exists() && !indexDirectory.mkdirs()) {
|
if (!indexDirectory.exists() && !indexDirectory.mkdirs()) {
|
||||||
throw new IOException("Unable to create Indexing Directory " + indexDirectory);
|
throw new IOException("Unable to create Indexing Directory " + indexDirectory);
|
||||||
}
|
}
|
||||||
|
@ -57,7 +62,6 @@ public class IndexSearch {
|
||||||
}
|
}
|
||||||
|
|
||||||
final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
|
final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
|
||||||
final Set<ProvenanceEventRecord> matchingRecords;
|
|
||||||
|
|
||||||
// we need to set the start date because if we do not, the first index may still have events that have aged off from
|
// we need to set the start date because if we do not, the first index may still have events that have aged off from
|
||||||
// the repository, and we don't want those events to count toward the total number of matches.
|
// the repository, and we don't want those events to count toward the total number of matches.
|
||||||
|
@ -77,38 +81,47 @@ public class IndexSearch {
|
||||||
final long searchStartNanos = System.nanoTime();
|
final long searchStartNanos = System.nanoTime();
|
||||||
final long openSearcherNanos = searchStartNanos - start;
|
final long openSearcherNanos = searchStartNanos - start;
|
||||||
|
|
||||||
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
|
final Sort sort = new Sort(new SortField(SearchableFields.Identifier.getSearchableFieldName(), Type.LONG, true));
|
||||||
|
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults(), sort);
|
||||||
final long finishSearch = System.nanoTime();
|
final long finishSearch = System.nanoTime();
|
||||||
final long searchNanos = finishSearch - searchStartNanos;
|
final long searchNanos = finishSearch - searchStartNanos;
|
||||||
|
|
||||||
logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
|
logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
|
||||||
TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
|
TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
|
||||||
|
|
||||||
if (topDocs.totalHits == 0) {
|
if (topDocs.totalHits == 0) {
|
||||||
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
|
sqr.update(Collections.<ProvenanceEventRecord> emptyList(), 0, 0);
|
||||||
return sqr;
|
return sqr;
|
||||||
}
|
}
|
||||||
|
|
||||||
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
|
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
|
||||||
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
|
final Set<ProvenanceEventRecord> matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(),
|
||||||
provenanceQuery.getMaxResults(), maxAttributeChars);
|
provenanceQuery.getMaxResults(), maxAttributeChars);
|
||||||
|
|
||||||
final long readRecordsNanos = System.nanoTime() - finishSearch;
|
final long readRecordsNanos = System.nanoTime() - finishSearch;
|
||||||
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
|
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
|
||||||
|
|
||||||
sqr.update(matchingRecords, topDocs.totalHits);
|
// The records returned are going to be in a sorted set. The sort order will be dependent on
|
||||||
|
// the ID of the events, which is also approximately the same as the timestamp of the event (i.e.
|
||||||
|
// it's ordered by the time when the event was inserted into the repo, not the time when the event took
|
||||||
|
// place). We want to reverse this so that we get the newest events first, so we have to first create a
|
||||||
|
// new List object to hold the events, and then reverse the list.
|
||||||
|
final List<ProvenanceEventRecord> recordList = new ArrayList<>(matchingRecords);
|
||||||
|
Collections.reverse(recordList);
|
||||||
|
|
||||||
|
sqr.update(recordList, topDocs.totalHits, 0);
|
||||||
return sqr;
|
return sqr;
|
||||||
} catch (final FileNotFoundException e) {
|
} catch (final FileNotFoundException e) {
|
||||||
// nothing has been indexed yet, or the data has already aged off
|
// nothing has been indexed yet, or the data has already aged off
|
||||||
logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
|
logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
|
||||||
if ( logger.isDebugEnabled() ) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.warn("", e);
|
logger.warn("", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
|
sqr.update(Collections.<ProvenanceEventRecord> emptyList(), 0, 0);
|
||||||
return sqr;
|
return sqr;
|
||||||
} finally {
|
} finally {
|
||||||
if ( searcher != null ) {
|
if (searcher != null) {
|
||||||
indexManager.returnIndexSearcher(indexDirectory, searcher);
|
indexManager.returnIndexSearcher(indexDirectory, searcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.BooleanClause.Occur;
|
import org.apache.lucene.search.BooleanClause.Occur;
|
||||||
|
@ -95,11 +94,11 @@ public class LineageQuery {
|
||||||
|
|
||||||
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
|
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
|
||||||
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
|
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
|
||||||
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
|
Integer.MAX_VALUE, maxAttributeChars);
|
||||||
|
|
||||||
final long readDocsEnd = System.nanoTime();
|
final long readDocsEnd = System.nanoTime();
|
||||||
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
|
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
|
||||||
indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
|
indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
|
||||||
|
|
||||||
return recs;
|
return recs;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -108,7 +107,7 @@ public class LineageQuery {
|
||||||
} catch (final FileNotFoundException fnfe) {
|
} catch (final FileNotFoundException fnfe) {
|
||||||
// nothing has been indexed yet, or the data has already aged off
|
// nothing has been indexed yet, or the data has already aged off
|
||||||
logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe);
|
logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe);
|
||||||
if ( logger.isDebugEnabled() ) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.warn("", fnfe);
|
logger.warn("", fnfe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileFilter;
|
import java.io.FileFilter;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -117,16 +118,16 @@ public class TestPersistentProvenanceRepository {
|
||||||
// Delete all of the storage files. We do this in order to clean up the tons of files that
|
// Delete all of the storage files. We do this in order to clean up the tons of files that
|
||||||
// we create but also to ensure that we have closed all of the file handles. If we leave any
|
// we create but also to ensure that we have closed all of the file handles. If we leave any
|
||||||
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
|
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
|
||||||
for ( final File storageDir : config.getStorageDirectories() ) {
|
for (final File storageDir : config.getStorageDirectories()) {
|
||||||
int i;
|
int i;
|
||||||
for (i=0; i < 3; i++) {
|
for (i = 0; i < 3; i++) {
|
||||||
try {
|
try {
|
||||||
FileUtils.deleteFile(storageDir, true);
|
FileUtils.deleteFile(storageDir, true);
|
||||||
break;
|
break;
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
// if there is a virus scanner, etc. running in the background we may not be able to
|
// if there is a virus scanner, etc. running in the background we may not be able to
|
||||||
// delete the file. Wait a sec and try again.
|
// delete the file. Wait a sec and try again.
|
||||||
if ( i == 2 ) {
|
if (i == 2) {
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
|
@ -441,7 +442,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
repo.waitForRollover();
|
repo.waitForRollover();
|
||||||
|
|
||||||
final Query query = new Query(UUID.randomUUID().toString());
|
final Query query = new Query(UUID.randomUUID().toString());
|
||||||
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
|
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
|
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
|
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
|
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
|
||||||
|
@ -603,7 +604,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
|
|
||||||
repo.purgeOldEvents();
|
repo.purgeOldEvents();
|
||||||
|
|
||||||
Thread.sleep(2000L); // purge is async. Give it time to do its job.
|
Thread.sleep(2000L); // purge is async. Give it time to do its job.
|
||||||
|
|
||||||
query.setMaxResults(100);
|
query.setMaxResults(100);
|
||||||
final QuerySubmission noResultSubmission = repo.submitQuery(query);
|
final QuerySubmission noResultSubmission = repo.submitQuery(query);
|
||||||
|
@ -614,6 +615,152 @@ public class TestPersistentProvenanceRepository {
|
||||||
assertEquals(0, noResultSubmission.getResult().getTotalHitCount());
|
assertEquals(0, noResultSubmission.getResult().getTotalHitCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEventsAreOrdered() throws IOException, InterruptedException, ParseException {
|
||||||
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
|
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||||
|
config.setMaxStorageCapacity(1024L * 1024L);
|
||||||
|
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||||
|
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||||
|
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||||
|
|
||||||
|
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||||
|
repo.initialize(getEventReporter());
|
||||||
|
|
||||||
|
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("abc", "xyz");
|
||||||
|
attributes.put("xyz", "abc");
|
||||||
|
attributes.put("filename", "file-" + uuid);
|
||||||
|
|
||||||
|
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||||
|
builder.setEventTime(System.currentTimeMillis());
|
||||||
|
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||||
|
builder.setTransitUri("nifi://unit-test");
|
||||||
|
attributes.put("uuid", uuid);
|
||||||
|
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||||
|
builder.setComponentId("1234");
|
||||||
|
builder.setComponentType("dummy processor");
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||||
|
builder.setEventTime(System.currentTimeMillis());
|
||||||
|
repo.registerEvent(builder.build());
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give time for rollover to happen
|
||||||
|
repo.waitForRollover();
|
||||||
|
|
||||||
|
// Perform a "Most Recent Events" Query
|
||||||
|
final Query query = new Query(UUID.randomUUID().toString());
|
||||||
|
query.setMaxResults(100);
|
||||||
|
|
||||||
|
final QueryResult result = repo.queryEvents(query);
|
||||||
|
assertEquals(10, result.getMatchingEvents().size());
|
||||||
|
|
||||||
|
final List<ProvenanceEventRecord> matchingEvents = result.getMatchingEvents();
|
||||||
|
long timestamp = matchingEvents.get(0).getEventTime();
|
||||||
|
|
||||||
|
for (final ProvenanceEventRecord record : matchingEvents) {
|
||||||
|
assertTrue(record.getEventTime() <= timestamp);
|
||||||
|
timestamp = record.getEventTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform a Query for a particular component, so that this doesn't just get the most recent events
|
||||||
|
// and has to actually hit Lucene.
|
||||||
|
final Query query2 = new Query(UUID.randomUUID().toString());
|
||||||
|
query2.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
|
||||||
|
query2.setMaxResults(100);
|
||||||
|
final QueryResult result2 = repo.queryEvents(query2);
|
||||||
|
assertEquals(10, result2.getMatchingEvents().size());
|
||||||
|
|
||||||
|
final List<ProvenanceEventRecord> matchingEvents2 = result2.getMatchingEvents();
|
||||||
|
timestamp = matchingEvents2.get(0).getEventTime();
|
||||||
|
|
||||||
|
for (final ProvenanceEventRecord record : matchingEvents2) {
|
||||||
|
assertTrue(record.getEventTime() <= timestamp);
|
||||||
|
timestamp = record.getEventTime();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEventsAreOrderedAcrossMultipleIndexes() throws IOException, InterruptedException, ParseException {
|
||||||
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
|
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||||
|
config.setMaxStorageCapacity(1024L * 1024L);
|
||||||
|
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||||
|
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||||
|
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||||
|
config.setDesiredIndexSize(1L);
|
||||||
|
|
||||||
|
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||||
|
repo.initialize(getEventReporter());
|
||||||
|
|
||||||
|
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("abc", "xyz");
|
||||||
|
attributes.put("xyz", "abc");
|
||||||
|
attributes.put("filename", "file-" + uuid);
|
||||||
|
|
||||||
|
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||||
|
builder.setEventTime(System.currentTimeMillis());
|
||||||
|
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||||
|
builder.setTransitUri("nifi://unit-test");
|
||||||
|
attributes.put("uuid", uuid);
|
||||||
|
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||||
|
builder.setComponentId("1234");
|
||||||
|
builder.setComponentType("dummy processor");
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||||
|
builder.setEventTime(System.currentTimeMillis());
|
||||||
|
repo.registerEvent(builder.build());
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give time for rollover to happen
|
||||||
|
repo.waitForRollover();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||||
|
builder.setEventTime(System.currentTimeMillis());
|
||||||
|
repo.registerEvent(builder.build());
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.waitForRollover();
|
||||||
|
|
||||||
|
// Verify that multiple indexes exist
|
||||||
|
final File storageDir = config.getStorageDirectories().get(0);
|
||||||
|
final File[] subDirs = storageDir.listFiles(new FilenameFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(final File dir, final String name) {
|
||||||
|
return name.startsWith("index-");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals(2, subDirs.length);
|
||||||
|
|
||||||
|
// Perform a Query for a particular component, so that this doesn't just get the most recent events
|
||||||
|
// and has to actually hit Lucene.
|
||||||
|
final Query query = new Query(UUID.randomUUID().toString());
|
||||||
|
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
|
||||||
|
query.setMaxResults(100);
|
||||||
|
final QueryResult result = repo.queryEvents(query);
|
||||||
|
assertEquals(20, result.getMatchingEvents().size());
|
||||||
|
|
||||||
|
final List<ProvenanceEventRecord> matchingEvents = result.getMatchingEvents();
|
||||||
|
long timestamp = matchingEvents.get(0).getEventTime();
|
||||||
|
|
||||||
|
for (final ProvenanceEventRecord record : matchingEvents) {
|
||||||
|
assertTrue(record.getEventTime() <= timestamp);
|
||||||
|
timestamp = record.getEventTime();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException {
|
public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException {
|
||||||
final RepositoryConfiguration config = createConfiguration();
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
|
@ -939,7 +1086,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||||
config.setDesiredIndexSize(10); // force new index to be created for each rollover
|
config.setDesiredIndexSize(10); // force new index to be created for each rollover
|
||||||
|
|
||||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||||
repo.initialize(getEventReporter());
|
repo.initialize(getEventReporter());
|
||||||
|
@ -961,7 +1108,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
||||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||||
builder.setEventTime(10L); // make sure the events are destroyed when we call purge
|
builder.setEventTime(10L); // make sure the events are destroyed when we call purge
|
||||||
repo.registerEvent(builder.build());
|
repo.registerEvent(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,7 +1166,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
@Test
|
@Test
|
||||||
public void testBackPressure() throws IOException, InterruptedException {
|
public void testBackPressure() throws IOException, InterruptedException {
|
||||||
final RepositoryConfiguration config = createConfiguration();
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
config.setMaxEventFileCapacity(1L); // force rollover on each record.
|
config.setMaxEventFileCapacity(1L); // force rollover on each record.
|
||||||
config.setJournalCount(1);
|
config.setJournalCount(1);
|
||||||
|
|
||||||
final AtomicInteger journalCountRef = new AtomicInteger(0);
|
final AtomicInteger journalCountRef = new AtomicInteger(0);
|
||||||
|
|
|
@ -373,7 +373,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Lineage computeLineage(final String flowFileUUID) throws IOException {
|
public Lineage computeLineage(final String flowFileUUID) throws IOException {
|
||||||
return computeLineage(Collections.<String>singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null);
|
return computeLineage(Collections.<String> singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException {
|
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException {
|
||||||
|
@ -411,9 +411,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
||||||
public ComputeLineageSubmission submitExpandParents(final long eventId) {
|
public ComputeLineageSubmission submitExpandParents(final long eventId) {
|
||||||
final ProvenanceEventRecord event = getEvent(eventId);
|
final ProvenanceEventRecord event = getEvent(eventId);
|
||||||
if (event == null) {
|
if (event == null) {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
|
submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
|
||||||
return submission;
|
return submission;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -424,7 +424,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
||||||
case CLONE:
|
case CLONE:
|
||||||
return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId);
|
return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId);
|
||||||
default: {
|
default: {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
|
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
|
||||||
return submission;
|
return submission;
|
||||||
|
@ -440,9 +440,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
||||||
public ComputeLineageSubmission submitExpandChildren(final long eventId) {
|
public ComputeLineageSubmission submitExpandChildren(final long eventId) {
|
||||||
final ProvenanceEventRecord event = getEvent(eventId);
|
final ProvenanceEventRecord event = getEvent(eventId);
|
||||||
if (event == null) {
|
if (event == null) {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
|
submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
|
||||||
return submission;
|
return submission;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,7 +453,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
||||||
case CLONE:
|
case CLONE:
|
||||||
return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId);
|
return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId);
|
||||||
default: {
|
default: {
|
||||||
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
|
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
|
||||||
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
|
||||||
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
|
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
|
||||||
return submission;
|
return submission;
|
||||||
|
@ -526,7 +526,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
|
||||||
|
|
||||||
}, IterationDirection.BACKWARD);
|
}, IterationDirection.BACKWARD);
|
||||||
|
|
||||||
submission.getResult().update(matchingRecords, matchingCount.get());
|
submission.getResult().update(matchingRecords, matchingCount.get(), 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue