This commit is contained in:
joewitt 2015-04-27 14:26:26 -04:00
commit 43b2f040bc
29 changed files with 1372 additions and 1588 deletions

View File

@ -201,7 +201,8 @@ public class IndexConfiguration {
* desired
* @param endTime the end time of the query for which the indices are
* desired
* @return
* @return the index directories that are applicable only for the given time
* span (times inclusive).
*/
public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
if (startTime == null && endTime == null) {
@ -252,7 +253,8 @@ public class IndexConfiguration {
*
* @param provenanceLogFile the provenance log file for which the index
* directories are desired
* @return
* @return the index directories that are applicable only for the given
* event log
*/
public List<File> getIndexDirectories(final File provenanceLogFile) {
final List<File> dirs = new ArrayList<>();
@ -334,9 +336,7 @@ public class IndexConfiguration {
}
/**
* Returns the amount of disk space in bytes used by all of the indices
*
* @return
* @return the amount of disk space in bytes used by all of the indices
*/
public long getIndexSize() {
lock.lock();

View File

@ -139,7 +139,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final List<ExpirationAction> expirationActions = new ArrayList<>();
private final IndexingAction indexingAction;
private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
@ -185,13 +184,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
this.alwaysSync = configuration.isAlwaysSync();
this.rolloverCheckMillis = rolloverCheckMillis;
final List<SearchableField> fields = configuration.getSearchableFields();
if (fields != null && !fields.isEmpty()) {
indexingAction = new IndexingAction(this, indexConfig);
} else {
indexingAction = null;
}
scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
@ -489,18 +481,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
maxIdFile = file;
}
if (firstId > maxIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
if (firstId > maxIndexedId) {
maxIndexedId = firstId - 1;
}
if (firstId < minIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
if (firstId < minIndexedId) {
minIndexedId = firstId;
}
}
if (maxIdFile != null) {
final boolean lastFileIndexed = indexingAction == null ? false : indexingAction.hasBeenPerformed(maxIdFile);
// Determine the max ID in the last file.
try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
final long eventId = reader.getMaxEventId();
@ -510,7 +500,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// If the ID is greater than the max indexed id and this file was indexed, then
// update the max indexed id
if (eventId > maxIndexedId && lastFileIndexed) {
if (eventId > maxIndexedId) {
maxIndexedId = eventId;
}
} catch (final IOException ioe) {
@ -624,7 +614,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
readLock.lock();
try {
if (repoDirty.get()) {
logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. Will not attempt to persist more records until the repo has been rolled over.");
logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. "
+ "Will not attempt to persist more records until the repo has been rolled over.");
return;
}
@ -670,7 +661,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final IOException ioe) {
logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString());
logger.error("", ioe);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() +
". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
// Switch from readLock to writeLock so that we can perform rollover
readLock.unlock();
@ -735,9 +727,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
/**
* Returns the size, in bytes, of the Repository storage
*
* @param logFiles
* @param timeCutoff
* @return
* @param logFiles the log files to consider
* @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped
* @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff
*/
public long getSize(final List<File> logFiles, final long timeCutoff) {
long bytesUsed = 0L;
@ -760,7 +752,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
/**
* Purges old events from the repository
*
* @throws IOException
* @throws IOException if unable to purge old events due to an I/O problem
*/
void purgeOldEvents() throws IOException {
while (!recoveryFinished.get()) {
@ -858,12 +850,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
removed.add(baseName);
} catch (final FileNotFoundException fnf) {
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not perform additional Expiration Actions on this file", currentAction, file);
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
+ "perform additional Expiration Actions on this file", currentAction, file);
removed.add(baseName);
} catch (final Throwable t) {
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional Expiration Actions on this file at this time", currentAction, file, t.toString());
logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
+ "Expiration Actions on this file at this time", currentAction, file, t.toString());
logger.warn("", t);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions on this file at this time");
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
" on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
"on this file at this time");
}
}
@ -922,8 +918,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
/**
* MUST be called with the write lock held
*
* @param force
* @throws IOException
* @param force if true, will force a rollover regardless of whether or not data has been written
* @throws IOException if unable to complete rollover
*/
private void rollover(final boolean force) throws IOException {
if (!configuration.isAllowRollover()) {
@ -1120,7 +1116,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return mergedFile;
}
File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter,
final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
logger.debug("Merging {} to {}", journalFiles, mergedFile);
if ( this.closed ) {
logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
@ -1241,12 +1238,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
record = reader.nextRecord();
} catch (final EOFException eof) {
} catch (final Exception e) {
logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't completely written to the file. This record will be skipped.");
logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
+ "completely written to the file. This record will be skipped.");
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e +
"; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
}
if (record == null) {
@ -1261,7 +1260,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader();
final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
final IndexingAction indexingAction = new IndexingAction(this);
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile);
final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
@ -1374,7 +1373,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
public QuerySubmission submitQuery(final Query query) {
final int numQueries = querySubmissionMap.size();
if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
+ "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
}
if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
@ -1432,11 +1432,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
/**
* REMOVE-ME: This is for testing only and can be removed.
* This is for testing only and not actually used other than in debugging
*
* @param luceneQuery
* @return
* @throws IOException
* @param luceneQuery the lucene query to execute
* @return an Iterator of ProvenanceEventRecord that match the query
* @throws IOException if unable to perform the query
*/
public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
final List<File> indexFiles = indexConfig.getIndexDirectories();
@ -1601,7 +1601,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
}
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, final Long endTimestamp) throws IOException {
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
final Long endTimestamp) throws IOException {
final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
final StandardLineageResult result = submission.getResult();
while (!result.isFinished()) {
@ -1623,7 +1624,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
}
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
final Long eventId, final long startTimestamp, final long endTimestamp) {
final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
lineageSubmissionMap.put(result.getLineageIdentifier(), result);

View File

@ -62,7 +62,7 @@ public class RepositoryConfiguration {
/**
* Specifies where the repository will store data
*
* @return
* @return the directories where provenance files will be stored
*/
public List<File> getStorageDirectories() {
return Collections.unmodifiableList(storageDirectories);
@ -71,18 +71,15 @@ public class RepositoryConfiguration {
/**
* Specifies where the repository should store data
*
* @param storageDirectory
* @param storageDirectory the directory to store provenance files
*/
public void addStorageDirectory(final File storageDirectory) {
this.storageDirectories.add(storageDirectory);
}
/**
* Returns the minimum amount of time that a given record will stay in the
* repository
*
* @param timeUnit
* @return
* @param timeUnit the desired time unit
* @return the max amount of time that a given record will stay in the repository
*/
public long getMaxRecordLife(final TimeUnit timeUnit) {
return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
@ -91,8 +88,8 @@ public class RepositoryConfiguration {
/**
* Specifies how long a record should stay in the repository
*
* @param maxRecordLife
* @param timeUnit
* @param maxRecordLife the max amount of time to keep a record in the repo
* @param timeUnit the period of time used by maxRecordLife
*/
public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) {
this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit);
@ -101,7 +98,7 @@ public class RepositoryConfiguration {
/**
* Returns the maximum amount of data to store in the repository (in bytes)
*
* @return
* @return the maximum amount of disk space to use for the prov repo
*/
public long getMaxStorageCapacity() {
return storageCapacity;
@ -109,107 +106,91 @@ public class RepositoryConfiguration {
/**
* Sets the maximum amount of data to store in the repository (in bytes)
* @param maxStorageCapacity
*
* @param maxStorageCapacity the maximum amount of disk space to use for the prov repo
*/
public void setMaxStorageCapacity(final long maxStorageCapacity) {
this.storageCapacity = maxStorageCapacity;
}
/**
* Returns the maximum amount of time to write to a single event file
*
* @param timeUnit
* @return
* @param timeUnit the desired time unit for the returned value
* @return the maximum amount of time that the repo will write to a single event file
*/
public long getMaxEventFileLife(final TimeUnit timeUnit) {
return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
}
/**
* Sets the maximum amount of time to write to a single event file
*
* @param maxEventFileTime
* @param timeUnit
* @param maxEventFileTime the max amount of time to write to a single event file
* @param timeUnit the units for the value supplied by maxEventFileTime
*/
public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) {
this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit);
}
/**
* Returns the maximum number of bytes (pre-compression) that will be
* @return the maximum number of bytes (pre-compression) that will be
* written to a single event file before the file is rolled over
*
* @return
*/
public long getMaxEventFileCapacity() {
return eventFileBytes;
}
/**
* Sets the maximum number of bytes (pre-compression) that will be written
* @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written
* to a single event file before the file is rolled over
*
* @param maxEventFileBytes
*/
public void setMaxEventFileCapacity(final long maxEventFileBytes) {
this.eventFileBytes = maxEventFileBytes;
}
/**
* Returns the fields that can be indexed
*
* @return
* @return the fields that should be indexed
*/
public List<SearchableField> getSearchableFields() {
return Collections.unmodifiableList(searchableFields);
}
/**
* Sets the fields to index
*
* @param searchableFields
* @param searchableFields the fields to index
*/
public void setSearchableFields(final List<SearchableField> searchableFields) {
this.searchableFields = new ArrayList<>(searchableFields);
}
/**
* Returns the FlowFile attributes that can be indexed
*
* @return
* @return the FlowFile attributes that should be indexed
*/
public List<SearchableField> getSearchableAttributes() {
return Collections.unmodifiableList(searchableAttributes);
}
/**
* Sets the FlowFile attributes to index
*
* @param searchableAttributes
* @param searchableAttributes the FlowFile attributes to index
*/
public void setSearchableAttributes(final List<SearchableField> searchableAttributes) {
this.searchableAttributes = new ArrayList<>(searchableAttributes);
}
/**
* Indicates whether or not event files will be compressed when they are
* @return whether or not event files will be compressed when they are
* rolled over
*
* @return
*/
public boolean isCompressOnRollover() {
return compress;
}
/**
* Specifies whether or not to compress event files on rollover
*
* @param compress
* @param compress if true, the data will be compressed when rolled over
*/
public void setCompressOnRollover(final boolean compress) {
this.compress = compress;
}
/**
* @return the number of threads to use to query the repo
*/
public int getQueryThreadPoolSize() {
return queryThreadPoolSize;
}
@ -246,27 +227,23 @@ public class RepositoryConfiguration {
* </li>
* </ol>
*
* @param bytes
* @param bytes the number of bytes to write to an index before beginning a new shard
*/
public void setDesiredIndexSize(final long bytes) {
this.desiredIndexBytes = bytes;
}
/**
* Returns the desired size of each index shard. See the
* {@Link #setDesiredIndexSize} method for an explanation of why we choose
* @return the desired size of each index shard. See the
* {@link #setDesiredIndexSize} method for an explanation of why we choose
* to shard the index.
*
* @return
*/
public long getDesiredIndexSize() {
return desiredIndexBytes;
}
/**
* Sets the number of Journal files to use when persisting records.
*
* @param numJournals
* @param numJournals the number of Journal files to use when persisting records.
*/
public void setJournalCount(final int numJournals) {
if (numJournals < 1) {
@ -277,19 +254,14 @@ public class RepositoryConfiguration {
}
/**
* Returns the number of Journal files that will be used when persisting
* records.
*
* @return
* @return the number of Journal files that will be used when persisting records.
*/
public int getJournalCount() {
return journalCount;
}
/**
* Specifies whether or not the Repository should sync all updates to disk.
*
* @return
* @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk
*/
public boolean isAlwaysSync() {
return alwaysSync;
@ -301,7 +273,7 @@ public class RepositoryConfiguration {
* persisted across restarted, even if there is a power failure or a sudden
* Operating System crash, but it can be very expensive.
*
* @param alwaysSync
* @param alwaysSync whether or not to perform an 'fsync' for all updates to disk
*/
public void setAlwaysSync(boolean alwaysSync) {
this.alwaysSync = alwaysSync;

View File

@ -25,9 +25,9 @@ public interface ExpirationAction {
* Performs some action against the given File and returns the new File that
* contains the modified version
*
* @param expiredFile
* @return
* @throws IOException
* @param expiredFile the file that was expired
* @return the new file after the file has been renamed, or the expiredFile if the file was not renamed
* @throws IOException if there was an IO problem
*/
File execute(File expiredFile) throws IOException;

View File

@ -50,7 +50,8 @@ public class DocsReader {
public DocsReader(final List<File> storageDirectories) {
}
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}

View File

@ -16,50 +16,30 @@
*/
package org.apache.nifi.provenance.lucene;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.IndexConfiguration;
import org.apache.nifi.provenance.PersistentProvenanceRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.rollover.RolloverAction;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexingAction implements RolloverAction {
private final PersistentProvenanceRepository repository;
public class IndexingAction {
private final Set<SearchableField> nonAttributeSearchableFields;
private final Set<SearchableField> attributeSearchableFields;
private final IndexConfiguration indexConfiguration;
private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
public IndexingAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfig) {
repository = repo;
indexConfiguration = indexConfig;
public IndexingAction(final PersistentProvenanceRepository repo) {
attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes()));
nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields()));
}
@ -150,87 +130,4 @@ public class IndexingAction implements RolloverAction {
indexWriter.addDocument(doc);
}
}
@Override
public File execute(final File fileRolledOver) throws IOException {
final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
int indexCount = 0;
long maxId = -1L;
try (final Directory directory = FSDirectory.open(indexingDirectory);
final Analyzer analyzer = new StandardAnalyzer()) {
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
try (final IndexWriter indexWriter = new IndexWriter(directory, config);
final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
StandardProvenanceEventRecord record;
while (true) {
final Integer blockIndex;
if ( reader.isBlockIndexAvailable() ) {
blockIndex = reader.getBlockIndex();
} else {
blockIndex = null;
}
try {
record = reader.nextRecord();
} catch (final EOFException eof) {
// system was restarted while writing to the log file. Nothing we can do here, so ignore this record.
// On system restart, the FlowFiles should be back in their "original" queues, so the events will be re-created
// when the data is re-processed
break;
}
if (record == null) {
break;
}
maxId = record.getEventId();
index(record, indexWriter, blockIndex);
indexCount++;
}
indexWriter.commit();
} catch (final EOFException eof) {
// nothing in the file. Move on.
}
} finally {
if (maxId >= -1) {
indexConfiguration.setMaxIdIndexed(maxId);
}
}
final File newFile = new File(fileRolledOver.getParent(),
LuceneUtil.substringBeforeLast(fileRolledOver.getName(), ".")
+ ".indexed."
+ LuceneUtil.substringAfterLast(fileRolledOver.getName(), "."));
boolean renamed = false;
for (int i = 0; i < 10 && !renamed; i++) {
renamed = fileRolledOver.renameTo(newFile);
if (!renamed) {
try {
Thread.sleep(25L);
} catch (final InterruptedException e) {
}
}
}
if (renamed) {
logger.info("Finished indexing Provenance Log File {} to index {} with {} records indexed and renamed file to {}",
fileRolledOver, indexingDirectory, indexCount, newFile);
return newFile;
} else {
logger.warn("Finished indexing Provenance Log File {} with {} records indexed but failed to rename file to {}; indexed {} records", new Object[]{fileRolledOver, indexCount, newFile, indexCount});
return fileRolledOver;
}
}
@Override
public boolean hasBeenPerformed(final File fileRolledOver) {
return fileRolledOver.getName().contains(".indexed.");
}
}

View File

@ -48,7 +48,8 @@ public class LineageQuery {
public static final int MAX_LINEAGE_UUIDS = 100;
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory,
final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
}
@ -99,7 +100,8 @@ public class LineageQuery {
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
final long readDocsEnd = System.nanoTime();
logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis",
TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
return recs;
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.rollover;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompressionAction implements RolloverAction {
private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
@Override
public File execute(final File fileRolledOver) throws IOException {
final File gzFile = new File(fileRolledOver.getParent(), fileRolledOver.getName() + ".gz");
try (final FileInputStream in = new FileInputStream(fileRolledOver);
final OutputStream fos = new FileOutputStream(gzFile);
final GZIPOutputStream gzipOut = new GZIPOutputStream(fos, 1)) {
StreamUtils.copy(in, gzipOut);
in.getFD().sync();
}
boolean deleted = false;
for (int i = 0; i < 10 && !deleted; i++) {
deleted = fileRolledOver.delete();
}
logger.info("Finished compressing Provenance Log File {}", fileRolledOver);
return gzFile;
}
@Override
public boolean hasBeenPerformed(final File fileRolledOver) {
return fileRolledOver.getName().contains(".gz");
}
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.rollover;
import java.io.File;
import java.io.IOException;
public interface RolloverAction {
/**
* Performs some action against the given File and returns the new File that
* contains the modified version
*
* @param fileRolledOver
* @return
* @throws IOException
*/
File execute(File fileRolledOver) throws IOException;
boolean hasBeenPerformed(File fileRolledOver);
}

View File

@ -26,21 +26,22 @@ public interface RecordReader extends Closeable {
/**
* Returns the next record in the reader, or <code>null</code> if there is no more data available.
* @return
* @throws IOException
* @return the next Provenance event in the stream
* @throws IOException if unable to read the next event from the stream
*/
StandardProvenanceEventRecord nextRecord() throws IOException;
/**
* Skips the specified number of bytes
* @param bytesToSkip
* @throws IOException
* @param bytesToSkip the number of bytes to skip ahead
* @throws IOException if unable to skip ahead the specified number of bytes (e.g., the stream does
* not contain this many more bytes)
*/
void skip(long bytesToSkip) throws IOException;
/**
* Skips to the specified byte offset in the underlying stream.
* @param position
* @param position the byte offset to skip to
* @throws IOException if the underlying stream throws IOException, or if the reader has already
* passed the specified byte offset
*/
@ -49,7 +50,7 @@ public interface RecordReader extends Closeable {
/**
* Skips to the specified compression block
*
* @param blockIndex
* @param blockIndex the byte index to skip to
* @throws IOException if the underlying stream throws IOException, or if the reader has already
* read passed the specified compression block index
* @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
@ -61,7 +62,10 @@ public interface RecordReader extends Closeable {
* Note that the block index is incremented at the beginning of the {@link #nextRecord()}
* method. This means that this method will return the block from which the previous record was read,
* if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
* @return
*
* @return the current block index
* @throws IllegalStateException if the reader is reading a provenance event file that does not contain
* a Table of Contents
*/
int getBlockIndex();
@ -69,20 +73,20 @@ public interface RecordReader extends Closeable {
* Returns <code>true</code> if the compression block index is available. It will be available
* if and only if the reader is created with a TableOfContents
*
* @return
* @return true if the reader is reading from an event file that has a Table of Contents
*/
boolean isBlockIndexAvailable();
/**
* Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
* <code>null</code> otherwise
* @return
*
* @return the TocReader if the underlying event file has an Table of Contents, <code>null</code> otherwise.
*/
TocReader getTocReader();
/**
* Returns the number of bytes that have been consumed from the stream (read or skipped).
* @return
* @return the number of bytes that have been consumed from the stream (read or skipped).
*/
long getBytesConsumed();
@ -91,8 +95,9 @@ public interface RecordReader extends Closeable {
* has already read through all records. Note: This method will consume the stream until the end,
* so no more records will be available on this reader after calling this method.
*
* @return
* @throws IOException
* @return the ID of the last event in this record reader, or -1 if the reader has no records or
* has already read through all records
* @throws IOException if unable to get id of the last event
*/
long getMaxEventId() throws IOException;
}

View File

@ -28,31 +28,27 @@ public interface RecordWriter extends Closeable {
/**
* Writes header information to the underlying stream
*
* @throws IOException
* @throws IOException if unable to write header information to the underlying stream
*/
void writeHeader() throws IOException;
/**
* Writes the given record out to the underlying stream
*
* @param record
* @param recordIdentifier
* @param record the record to write
* @param recordIdentifier the new identifier of the record
* @return the number of bytes written for the given records
* @throws IOException
* @throws IOException if unable to write the record to the stream
*/
long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
/**
* Returns the number of Records that have been written to this RecordWriter
*
* @return
* @return the number of Records that have been written to this RecordWriter
*/
int getRecordsWritten();
/**
* Returns the file that this RecordWriter is writing to
*
* @return
* @return the file that this RecordWriter is writing to
*/
File getFile();
@ -73,19 +69,18 @@ public interface RecordWriter extends Closeable {
* not immediately available, returns <code>false</code>; otherwise, obtains
* the lock and returns <code>true</code>.
*
* @return
* @return <code>true</code> if the lock was obtained, <code>false</code> otherwise.
*/
boolean tryLock();
/**
* Syncs the content written to this writer to disk.
* @throws java.io.IOException
* @throws IOException if unable to sync content to disk
*/
void sync() throws IOException;
/**
* Returns the TOC Writer that is being used to write the Table of Contents for this journal
* @return
* @return the TOC Writer that is being used to write the Table of Contents for this journal
*/
TocWriter getTocWriter();
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.provenance.toc;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
@ -52,7 +51,7 @@ public class StandardTocWriter implements TocWriter {
* Creates a StandardTocWriter that writes to the given file.
* @param file the file to write to
* @param compressionFlag whether or not the journal is compressed
* @throws FileNotFoundException
* @throws IOException if unable to write header info to the specified file
*/
public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
final File tocDir = file.getParentFile();

View File

@ -32,27 +32,31 @@ public interface TocReader extends Closeable {
/**
* Indicates whether or not the corresponding Journal file is compressed
* @return
* @return <code>true</code> if the event file is compressed
*/
boolean isCompressed();
/**
* Returns the byte offset into the Journal File for the Block with the given index.
* @param blockIndex
* @return
*
* @param blockIndex the block index to get the byte offset for
* @return the byte offset for the given block index, or <code>-1</code> if the given block index
* does not exist
*/
long getBlockOffset(int blockIndex);
/**
* Returns the byte offset into the Journal File of the last Block in the given index
* @return
* @return the byte offset into the Journal File of the last Block in the given index
*/
long getLastBlockOffset();
/**
* Returns the index of the block that contains the given offset
* @param blockOffset
* @return
*
* @param blockOffset the byte offset for which the block index is desired
*
* @return the index of the block that contains the given offset
*/
int getBlockIndex(long blockOffset);
}

View File

@ -23,9 +23,12 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
public class TocUtil {
/**
* Returns the file that should be used as the Table of Contents for the given Journal File
* @param journalFile
* @return
* Returns the file that should be used as the Table of Contents for the given Journal File.
* Note, if no TOC exists for the given Journal File, a File will still be returned but the file
* will not actually exist.
*
* @param journalFile the journal file for which to get the Table of Contents
* @return the file that represents the Table of Contents for the specified journal file.
*/
public static File getTocFile(final File journalFile) {
final File tocDir = new File(journalFile.getParentFile(), "toc");

View File

@ -27,26 +27,24 @@ public interface TocWriter extends Closeable {
/**
* Adds the given block offset as the next Block Offset in the Table of Contents
* @param offset
* @throws IOException
* @param offset the byte offset at which the block begins
* @throws IOException if unable to persist the block index
*/
void addBlockOffset(long offset) throws IOException;
/**
* Returns the index of the current Block
* @return
* @return the index of the current Block
*/
int getCurrentBlockIndex();
/**
* Returns the file that is currently being written to
* @return
* @return the file that is currently being written to
*/
File getFile();
/**
* Synchronizes the data with the underlying storage device
* @throws IOException
* @throws IOException if unable to synchronize the data with the underlying storage device
*/
void sync() throws IOException;
}

View File

@ -431,7 +431,7 @@ public class TestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query(UUID.randomUUID().toString());
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));