Merge branch 'develop' into ListHDFS

This commit is contained in:
Mark Payne 2015-04-29 16:14:01 -04:00
commit 25e2b6c8ad
13 changed files with 491 additions and 265 deletions

View File

@ -121,13 +121,13 @@ public class IndexConfiguration {
} }
} }
public File getWritableIndexDirectory(final File provenanceLogFile) { public File getWritableIndexDirectory(final File provenanceLogFile, final long newIndexTimestamp) {
lock.lock(); lock.lock();
try { try {
final File storageDirectory = provenanceLogFile.getParentFile(); final File storageDirectory = provenanceLogFile.getParentFile();
List<File> indexDirectories = this.indexDirectoryMap.get(storageDirectory); List<File> indexDirectories = this.indexDirectoryMap.get(storageDirectory);
if (indexDirectories == null) { if (indexDirectories == null) {
final File newDir = addNewIndex(storageDirectory, provenanceLogFile); final File newDir = addNewIndex(storageDirectory, provenanceLogFile, newIndexTimestamp);
indexDirectories = new ArrayList<>(); indexDirectories = new ArrayList<>();
indexDirectories.add(newDir); indexDirectories.add(newDir);
indexDirectoryMap.put(storageDirectory, indexDirectories); indexDirectoryMap.put(storageDirectory, indexDirectories);
@ -135,7 +135,7 @@ public class IndexConfiguration {
} }
if (indexDirectories.isEmpty()) { if (indexDirectories.isEmpty()) {
final File newDir = addNewIndex(storageDirectory, provenanceLogFile); final File newDir = addNewIndex(storageDirectory, provenanceLogFile, newIndexTimestamp);
indexDirectories.add(newDir); indexDirectories.add(newDir);
return newDir; return newDir;
} }
@ -143,7 +143,7 @@ public class IndexConfiguration {
final File lastDir = indexDirectories.get(indexDirectories.size() - 1); final File lastDir = indexDirectories.get(indexDirectories.size() - 1);
final long size = getSize(lastDir); final long size = getSize(lastDir);
if (size > repoConfig.getDesiredIndexSize()) { if (size > repoConfig.getDesiredIndexSize()) {
final File newDir = addNewIndex(storageDirectory, provenanceLogFile); final File newDir = addNewIndex(storageDirectory, provenanceLogFile, newIndexTimestamp);
indexDirectories.add(newDir); indexDirectories.add(newDir);
return newDir; return newDir;
} else { } else {
@ -154,14 +154,14 @@ public class IndexConfiguration {
} }
} }
private File addNewIndex(final File storageDirectory, final File provenanceLogFile) { private File addNewIndex(final File storageDirectory, final File provenanceLogFile, final long newIndexTimestamp) {
// Build the event time of the first record into the index's filename so that we can determine // Build the event time of the first record into the index's filename so that we can determine
// which index files to look at when we perform a search. We use the timestamp of the first record // which index files to look at when we perform a search. We use the timestamp of the first record
// in the Provenance Log file, rather than the current time, because we may perform the Indexing // in the Provenance Log file, rather than the current time, because we may perform the Indexing
// retroactively. // retroactively.
Long firstEntryTime = getFirstEntryTime(provenanceLogFile); Long firstEntryTime = getFirstEntryTime(provenanceLogFile);
if (firstEntryTime == null) { if (firstEntryTime == null) {
firstEntryTime = System.currentTimeMillis(); firstEntryTime = newIndexTimestamp;
} }
return new File(storageDirectory, "index-" + firstEntryTime); return new File(storageDirectory, "index-" + firstEntryTime);
} }
@ -222,7 +222,7 @@ public class IndexConfiguration {
} }
}); });
for (File indexDir : sortedIndexDirectories) { for (final File indexDir : sortedIndexDirectories) {
// If the index was last modified before the start time, we know that it doesn't // If the index was last modified before the start time, we know that it doesn't
// contain any data for us to query. // contain any data for us to query.
if (startTime != null && indexDir.lastModified() < startTime) { if (startTime != null && indexDir.lastModified() < startTime) {
@ -282,7 +282,7 @@ public class IndexConfiguration {
} }
boolean foundIndexCreatedLater = false; boolean foundIndexCreatedLater = false;
for (File indexDir : sortedIndexDirectories) { for (final File indexDir : sortedIndexDirectories) {
// If the index was last modified before the log file was created, we know the index doesn't include // If the index was last modified before the log file was created, we know the index doesn't include
// any data for the provenance log. // any data for the provenance log.
if (indexDir.lastModified() < firstEntryTime) { if (indexDir.lastModified() < firstEntryTime) {

View File

@ -87,11 +87,13 @@ import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.RecordWriters; import org.apache.nifi.provenance.serialization.RecordWriters;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -328,7 +330,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i); final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i);
writers[i] = RecordWriters.newRecordWriter(journalFile, false, false); writers[i] = RecordWriters.newRecordWriter(journalFile, false, false);
writers[i].writeHeader(); writers[i].writeHeader(initialRecordId);
} }
logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId); logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId);
@ -361,6 +363,19 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
for (final Path path : paths) { for (final Path path : paths) {
try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) { try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) {
// if this is the first record, try to find out the block index and jump directly to
// the block index. This avoids having to read through a lot of data that we don't care about
// just to get to the first record that we want.
if ( records.isEmpty() ) {
final TocReader tocReader = reader.getTocReader();
if ( tocReader != null ) {
final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId);
if (blockIndex != null) {
reader.skipToBlock(blockIndex);
}
}
}
StandardProvenanceEventRecord record; StandardProvenanceEventRecord record;
while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) { while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) {
if (record.getEventId() >= firstRecordId) { if (record.getEventId() >= firstRecordId) {
@ -699,7 +714,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (bytesWrittenSinceRollover.get() >= configuration.getMaxEventFileCapacity()) { if (bytesWrittenSinceRollover.get() >= configuration.getMaxEventFileCapacity()) {
try { try {
rollover(false); rollover(false);
} catch (IOException e) { } catch (final IOException e) {
logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString()); logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString());
logger.error("", e); logger.error("", e);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString()); eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString());
@ -1001,7 +1016,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (fileRolledOver == null) { if (fileRolledOver == null) {
return; return;
} }
File file = fileRolledOver; final File file = fileRolledOver;
// update our map of id to Path // update our map of id to Path
// need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
@ -1010,7 +1025,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
writeLock.lock(); writeLock.lock();
try { try {
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator()); final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
newIdToPathMap.putAll(idToPathMap.get()); newIdToPathMap.putAll(idToPathMap.get());
newIdToPathMap.put(fileFirstEventId, file.toPath()); newIdToPathMap.put(fileFirstEventId, file.toPath());
idToPathMap.set(newIdToPathMap); idToPathMap.set(newIdToPathMap);
@ -1231,6 +1246,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
}); });
long minEventId = 0L;
long earliestTimestamp = System.currentTimeMillis();
for (final RecordReader reader : readers) { for (final RecordReader reader : readers) {
StandardProvenanceEventRecord record = null; StandardProvenanceEventRecord record = null;
@ -1252,17 +1269,33 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
continue; continue;
} }
if ( record.getEventTime() < earliestTimestamp ) {
earliestTimestamp = record.getEventTime();
}
if ( record.getEventId() < minEventId ) {
minEventId = record.getEventId();
}
recordToReaderMap.put(record, reader); recordToReaderMap.put(record, reader);
} }
// We want to keep track of the last 1000 events in the files so that we can add them to 'ringBuffer'.
// However, we don't want to add them directly to ringBuffer, because once they are added to ringBuffer, they are
// available in query results. As a result, we can have the issue where we've not finished indexing the file
// but we try to create the lineage for events in that file. In order to avoid this, we will add the records
// to a temporary RingBuffer and after we finish merging the records will then copy the data to the
// ringBuffer provided as a method argument.
final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
// loop over each entry in the map, persisting the records to the merged file in order, and populating the map // loop over each entry in the map, persisting the records to the merged file in order, and populating the map
// with the next entry from the journal file from which the previous record was written. // with the next entry from the journal file from which the previous record was written.
try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader(); writer.writeHeader(minEventId);
final IndexingAction indexingAction = new IndexingAction(this); final IndexingAction indexingAction = new IndexingAction(this);
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile); final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
try { try {
long maxId = 0L; long maxId = 0L;
@ -1278,7 +1311,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
indexingAction.index(record, indexWriter, blockIndex); indexingAction.index(record, indexWriter, blockIndex);
maxId = record.getEventId(); maxId = record.getEventId();
ringBuffer.add(record); latestRecords.add(record);
records++; records++;
// Remove this entry from the map // Remove this entry from the map
@ -1303,6 +1336,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
indexManager.returnIndexWriter(indexingDirectory, indexWriter); indexManager.returnIndexWriter(indexingDirectory, indexWriter);
} }
} }
// record should now be available in the repository. We can copy the values from latestRecords to ringBuffer.
latestRecords.forEach(new ForEachEvaluator<ProvenanceEventRecord>() {
@Override
public boolean evaluate(final ProvenanceEventRecord event) {
ringBuffer.add(event);
return true;
}
});
} finally { } finally {
for (final RecordReader reader : readers) { for (final RecordReader reader : readers) {
try { try {
@ -1452,11 +1494,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) { try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) {
final IndexSearcher searcher = new IndexSearcher(directoryReader); final IndexSearcher searcher = new IndexSearcher(directoryReader);
TopDocs topDocs = searcher.search(luceneQuery, 10000000); final TopDocs topDocs = searcher.search(luceneQuery, 10000000);
logger.info("For {}, Top Docs has {} hits; reading Lucene results", indexDirectory, topDocs.scoreDocs.length); logger.info("For {}, Top Docs has {} hits; reading Lucene results", indexDirectory, topDocs.scoreDocs.length);
if (topDocs.totalHits > 0) { if (topDocs.totalHits > 0) {
for (ScoreDoc scoreDoc : topDocs.scoreDocs) { for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc; final int docId = scoreDoc.doc;
final Document d = directoryReader.document(docId); final Document d = directoryReader.document(docId);
localScoreDocs.add(d); localScoreDocs.add(d);
@ -1649,16 +1691,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
switch (event.getEventType()) { switch (event.getEventType()) {
case CLONE: case CLONE:
case FORK: case FORK:
case JOIN: case JOIN:
case REPLAY: case REPLAY:
return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
default: default:
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
return submission; return submission;
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
@ -1686,17 +1728,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
switch (event.getEventType()) { switch (event.getEventType()) {
case JOIN: case JOIN:
case FORK: case FORK:
case CLONE: case CLONE:
case REPLAY: case REPLAY:
return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime());
default: { default: {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
return submission; return submission;
} }
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
@ -1880,7 +1922,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
try { try {
final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexDir, null, flowFileUuids); final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids);
final StandardLineageResult result = submission.getResult(); final StandardLineageResult result = submission.getResult();
result.update(matchingRecords); result.update(matchingRecords);

View File

@ -86,16 +86,22 @@ public class StandardRecordWriter implements RecordWriter {
} }
@Override @Override
public synchronized void writeHeader() throws IOException { public synchronized void writeHeader(final long firstEventId) throws IOException {
lastBlockOffset = rawOutStream.getBytesWritten(); lastBlockOffset = rawOutStream.getBytesWritten();
resetWriteStream(); resetWriteStream(firstEventId);
out.writeUTF(PersistentProvenanceRepository.class.getName()); out.writeUTF(PersistentProvenanceRepository.class.getName());
out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
out.flush(); out.flush();
} }
private void resetWriteStream() throws IOException {
/**
* Resets the streams to prepare for a new block
* @param eventId the first id that will be written to the new block
* @throws IOException if unable to flush/close the current streams properly
*/
private void resetWriteStream(final long eventId) throws IOException {
if ( out != null ) { if ( out != null ) {
out.flush(); out.flush();
} }
@ -112,13 +118,13 @@ public class StandardRecordWriter implements RecordWriter {
} }
if ( tocWriter != null ) { if ( tocWriter != null ) {
tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
} }
writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
} else { } else {
if ( tocWriter != null ) { if ( tocWriter != null ) {
tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
} }
writableStream = new BufferedOutputStream(rawOutStream, 65536); writableStream = new BufferedOutputStream(rawOutStream, 65536);
@ -130,7 +136,7 @@ public class StandardRecordWriter implements RecordWriter {
@Override @Override
public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException { public synchronized long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException {
final ProvenanceEventType recordType = record.getEventType(); final ProvenanceEventType recordType = record.getEventType();
final long startBytes = byteCountingOut.getBytesWritten(); final long startBytes = byteCountingOut.getBytesWritten();
@ -142,7 +148,7 @@ public class StandardRecordWriter implements RecordWriter {
// because of the way that GZIPOutputStream works, we need to call close() on it in order for it // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
// the underlying OutputStream in a NonCloseableOutputStream // the underlying OutputStream in a NonCloseableOutputStream
resetWriteStream(); resetWriteStream(recordIdentifier);
} }
} }

View File

@ -19,26 +19,23 @@ package org.apache.nifi.provenance.lucene;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.provenance.PersistentProvenanceRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory; import org.apache.nifi.provenance.PersistentProvenanceRepository;
import org.apache.lucene.store.FSDirectory; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.SearchableFields;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,7 +45,7 @@ public class LineageQuery {
public static final int MAX_LINEAGE_UUIDS = 100; public static final int MAX_LINEAGE_UUIDS = 100;
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory,
final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException { final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
@ -58,52 +55,62 @@ public class LineageQuery {
throw new IllegalArgumentException("Must specify either Lineage Identifier or FlowFile UUIDs to compute lineage"); throw new IllegalArgumentException("Must specify either Lineage Identifier or FlowFile UUIDs to compute lineage");
} }
try (final Directory fsDir = FSDirectory.open(indexDirectory); final IndexSearcher searcher;
final IndexReader indexReader = DirectoryReader.open(fsDir)) { try {
searcher = indexManager.borrowIndexSearcher(indexDirectory);
final IndexSearcher searcher = new IndexSearcher(indexReader); try {
// Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as
// Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as // "SHOULD" clauses and then setting the minimum required to 1.
// "SHOULD" clauses and then setting the minimum required to 1. final BooleanQuery flowFileIdQuery;
final BooleanQuery flowFileIdQuery; if (flowFileUuids == null || flowFileUuids.isEmpty()) {
if (flowFileUuids == null || flowFileUuids.isEmpty()) { flowFileIdQuery = null;
flowFileIdQuery = null;
} else {
flowFileIdQuery = new BooleanQuery();
for (final String flowFileUuid : flowFileUuids) {
flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD);
}
flowFileIdQuery.setMinimumNumberShouldMatch(1);
}
BooleanQuery query;
if (lineageIdentifier == null) {
query = flowFileIdQuery;
} else {
final BooleanQuery lineageIdQuery = new BooleanQuery();
lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST);
if (flowFileIdQuery == null) {
query = lineageIdQuery;
} else { } else {
query = new BooleanQuery(); flowFileIdQuery = new BooleanQuery();
query.add(flowFileIdQuery, Occur.SHOULD); for (final String flowFileUuid : flowFileUuids) {
query.add(lineageIdQuery, Occur.SHOULD); flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD);
query.setMinimumNumberShouldMatch(1); }
flowFileIdQuery.setMinimumNumberShouldMatch(1);
} }
BooleanQuery query;
if (lineageIdentifier == null) {
query = flowFileIdQuery;
} else {
final BooleanQuery lineageIdQuery = new BooleanQuery();
lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST);
if (flowFileIdQuery == null) {
query = lineageIdQuery;
} else {
query = new BooleanQuery();
query.add(flowFileIdQuery, Occur.SHOULD);
query.add(lineageIdQuery, Occur.SHOULD);
query.setMinimumNumberShouldMatch(1);
}
}
final long searchStart = System.nanoTime();
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
final long readDocsEnd = System.nanoTime();
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
return recs;
} finally {
indexManager.returnIndexSearcher(indexDirectory, searcher);
}
} catch (final FileNotFoundException fnfe) {
// nothing has been indexed yet, or the data has already aged off
logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe);
if ( logger.isDebugEnabled() ) {
logger.warn("", fnfe);
} }
final long searchStart = System.nanoTime(); return Collections.emptySet();
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
final long readDocsEnd = System.nanoTime();
logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis",
TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
return recs;
} }
} }

View File

@ -28,9 +28,10 @@ public interface RecordWriter extends Closeable {
/** /**
* Writes header information to the underlying stream * Writes header information to the underlying stream
* *
* @param firstEventId the ID of the first provenance event that will be written to the stream
* @throws IOException if unable to write header information to the underlying stream * @throws IOException if unable to write header information to the underlying stream
*/ */
void writeHeader() throws IOException; void writeHeader(long firstEventId) throws IOException;
/** /**
* Writes the given record out to the underlying stream * Writes the given record out to the underlying stream

View File

@ -37,6 +37,7 @@ import java.io.IOException;
public class StandardTocReader implements TocReader { public class StandardTocReader implements TocReader {
private final boolean compressed; private final boolean compressed;
private final long[] offsets; private final long[] offsets;
private final long[] firstEventIds;
public StandardTocReader(final File file) throws IOException { public StandardTocReader(final File file) throws IOException {
try (final FileInputStream fis = new FileInputStream(file); try (final FileInputStream fis = new FileInputStream(file);
@ -60,11 +61,32 @@ public class StandardTocReader implements TocReader {
throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
} }
final int numBlocks = (int) ((file.length() - 2) / 8); final int blockInfoBytes;
switch (version) {
case 1:
blockInfoBytes = 8;
break;
case 2:
default:
blockInfoBytes = 16;
break;
}
final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes);
offsets = new long[numBlocks]; offsets = new long[numBlocks];
if ( version > 1 ) {
firstEventIds = new long[numBlocks];
} else {
firstEventIds = new long[0];
}
for (int i=0; i < numBlocks; i++) { for (int i=0; i < numBlocks; i++) {
offsets[i] = dis.readLong(); offsets[i] = dis.readLong();
if ( version > 1 ) {
firstEventIds[i] = dis.readLong();
}
} }
} }
} }
@ -98,11 +120,36 @@ public class StandardTocReader implements TocReader {
public int getBlockIndex(final long blockOffset) { public int getBlockIndex(final long blockOffset) {
for (int i=0; i < offsets.length; i++) { for (int i=0; i < offsets.length; i++) {
if ( offsets[i] > blockOffset ) { if ( offsets[i] > blockOffset ) {
// if the offset is less than the offset of our first block,
// just return 0 to indicate the first block. Otherwise,
// return i-1 because i represents the first block whose offset is
// greater than 'blockOffset'.
return (i == 0) ? 0 : i-1;
}
}
// None of the blocks have an offset greater than the provided offset.
// Therefore, if the event is present, it must be in the last block.
return offsets.length - 1;
}
@Override
public Integer getBlockIndexForEventId(final long eventId) {
// if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC),
// or if the event ID is less than the first Event ID in this TOC, then the Event ID
// is unknown -- return null.
if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) {
return null;
}
for (int i=1; i < firstEventIds.length; i++) {
if ( firstEventIds[i] > eventId ) {
return i-1; return i-1;
} }
} }
return offsets.length - 1; // None of the blocks start with an Event ID greater than the provided ID.
// Therefore, if the event is present, it must be in the last block.
return firstEventIds.length - 1;
} }
} }

View File

@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class StandardTocWriter implements TocWriter { public class StandardTocWriter implements TocWriter {
private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
public static final byte VERSION = 1; public static final byte VERSION = 2;
private final File file; private final File file;
private final FileOutputStream fos; private final FileOutputStream fos;
@ -75,10 +75,11 @@ public class StandardTocWriter implements TocWriter {
} }
@Override @Override
public void addBlockOffset(final long offset) throws IOException { public void addBlockOffset(final long offset, final long firstEventId) throws IOException {
final BufferedOutputStream bos = new BufferedOutputStream(fos); final BufferedOutputStream bos = new BufferedOutputStream(fos);
final DataOutputStream dos = new DataOutputStream(bos); final DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(offset); dos.writeLong(offset);
dos.writeLong(firstEventId);
dos.flush(); dos.flush();
index++; index++;
logger.debug("Adding block {} at offset {}", index, offset); logger.debug("Adding block {} at offset {}", index, offset);

View File

@ -59,4 +59,13 @@ public interface TocReader extends Closeable {
* @return the index of the block that contains the given offset * @return the index of the block that contains the given offset
*/ */
int getBlockIndex(long blockOffset); int getBlockIndex(long blockOffset);
/**
* Returns the block index where the given event ID should be found
*
* @param eventId the ID of the provenance event of interest
* @return the block index where the given event ID should be found, or <code>null</code> if
* the block index is not known
*/
Integer getBlockIndexForEventId(long eventId);
} }

View File

@ -21,16 +21,19 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
/** /**
* Writes a .toc file * Writes a Table-of-Contents (.toc) file
*/ */
public interface TocWriter extends Closeable { public interface TocWriter extends Closeable {
/** /**
* Adds the given block offset as the next Block Offset in the Table of Contents * Adds the given block offset as the next Block Offset in the Table of Contents
*
* @param offset the byte offset at which the block begins * @param offset the byte offset at which the block begins
* @param firstEventId the ID of the first Provenance Event that will be in the block
*
* @throws IOException if unable to persist the block index * @throws IOException if unable to persist the block index
*/ */
void addBlockOffset(long offset) throws IOException; void addBlockOffset(long offset, long firstEventId) throws IOException;
/** /**
* @return the index of the current Block * @return the index of the current Block

View File

@ -67,7 +67,7 @@ public class TestStandardRecordReaderWriter {
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
writer.writeHeader(); writer.writeHeader(1L);
writer.writeRecord(createEvent(), 1L); writer.writeRecord(createEvent(), 1L);
writer.close(); writer.close();
@ -77,7 +77,7 @@ public class TestStandardRecordReaderWriter {
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
assertEquals(0, reader.getBlockIndex()); assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0); reader.skipToBlock(0);
StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered); assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals("nifi://unit-test", recovered.getTransitUri());
@ -95,7 +95,7 @@ public class TestStandardRecordReaderWriter {
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
writer.writeHeader(); writer.writeHeader(1L);
writer.writeRecord(createEvent(), 1L); writer.writeRecord(createEvent(), 1L);
writer.close(); writer.close();
@ -105,7 +105,7 @@ public class TestStandardRecordReaderWriter {
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
assertEquals(0, reader.getBlockIndex()); assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0); reader.skipToBlock(0);
StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered); assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals("nifi://unit-test", recovered.getTransitUri());
@ -124,7 +124,7 @@ public class TestStandardRecordReaderWriter {
// new record each 1 MB of uncompressed data // new record each 1 MB of uncompressed data
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
writer.writeHeader(); writer.writeHeader(1L);
for (int i=0; i < 10; i++) { for (int i=0; i < 10; i++) {
writer.writeRecord(createEvent(), i); writer.writeRecord(createEvent(), i);
} }
@ -143,7 +143,7 @@ public class TestStandardRecordReaderWriter {
reader.skipToBlock(0); reader.skipToBlock(0);
} }
StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered); assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals("nifi://unit-test", recovered.getTransitUri());
} }
@ -163,7 +163,7 @@ public class TestStandardRecordReaderWriter {
// new block each 10 bytes // new block each 10 bytes
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
writer.writeHeader(); writer.writeHeader(1L);
for (int i=0; i < 10; i++) { for (int i=0; i < 10; i++) {
writer.writeRecord(createEvent(), i); writer.writeRecord(createEvent(), i);
} }
@ -174,7 +174,7 @@ public class TestStandardRecordReaderWriter {
try (final FileInputStream fis = new FileInputStream(journalFile); try (final FileInputStream fis = new FileInputStream(journalFile);
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
for (int i=0; i < 10; i++) { for (int i=0; i < 10; i++) {
StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
System.out.println(recovered); System.out.println(recovered);
assertNotNull(recovered); assertNotNull(recovered);
assertEquals((long) i, recovered.getEventId()); assertEquals((long) i, recovered.getEventId());

View File

@ -64,11 +64,11 @@ public class TestStandardTocReader {
@Test @Test
public void testGetBlockIndex() throws IOException { public void testGetBlockIndexV1() throws IOException {
final File file = new File("target/" + UUID.randomUUID().toString()); final File file = new File("target/" + UUID.randomUUID().toString());
try (final OutputStream out = new FileOutputStream(file); try (final OutputStream out = new FileOutputStream(file);
final DataOutputStream dos = new DataOutputStream(out)) { final DataOutputStream dos = new DataOutputStream(out)) {
out.write(0); out.write(1);
out.write(0); out.write(0);
for (int i=0; i < 1024; i++) { for (int i=0; i < 1024; i++) {
@ -88,4 +88,31 @@ public class TestStandardTocReader {
file.delete(); file.delete();
} }
} }
@Test
public void testGetBlockIndexV2() throws IOException {
final File file = new File("target/" + UUID.randomUUID().toString());
try (final OutputStream out = new FileOutputStream(file);
final DataOutputStream dos = new DataOutputStream(out)) {
out.write(2);
out.write(0);
for (int i=0; i < 1024; i++) {
dos.writeLong(i * 1024L);
dos.writeLong(0L);
}
}
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertFalse(reader.isCompressed());
for (int i=0; i < 1024; i++) {
assertEquals(i * 1024, reader.getBlockOffset(i));
}
}
} finally {
file.delete();
}
}
} }

View File

@ -66,43 +66,43 @@ import org.apache.nifi.processor.util.StandardValidators;
public class ExecuteProcess extends AbstractProcessor { public class ExecuteProcess extends AbstractProcessor {
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
.name("Command") .name("Command")
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
.required(true) .required(true)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder() public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
.name("Command Arguments") .name("Command Arguments")
.description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
.required(false) .required(false)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
.name("Working Directory") .name("Working Directory")
.description("The directory to use as the current working directory when executing the command") .description("The directory to use as the current working directory when executing the command")
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) .addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder() public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
.name("Batch Duration") .name("Batch Duration")
.description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+ "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+ "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
.required(false) .required(false)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder() public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
.name("Redirect Error Stream") .name("Redirect Error Stream")
.description("If true will redirect any error stream output of the process to the output stream. " .description("If true will redirect any error stream output of the process to the output stream. "
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.") + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
.required(false) .required(false)
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
@ -111,11 +111,14 @@ public class ExecuteProcess extends AbstractProcessor {
.build(); .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All created FlowFiles are routed to this relationship") .description("All created FlowFiles are routed to this relationship")
.build(); .build();
private volatile ExecutorService executor; private volatile ExecutorService executor;
private Future<?> longRunningProcess;
private AtomicBoolean failure = new AtomicBoolean(false);
private volatile ProxyOutputStream proxyOut;
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
@ -135,11 +138,11 @@ public class ExecuteProcess extends AbstractProcessor {
@Override @Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder() return new PropertyDescriptor.Builder()
.name(propertyDescriptorName) .name(propertyDescriptorName)
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true) .dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
} }
static List<String> splitArgs(final String input) { static List<String> splitArgs(final String input) {
@ -209,15 +212,99 @@ public class ExecuteProcess extends AbstractProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (proxyOut==null) {
proxyOut = new ProxyOutputStream(getLogger());
}
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
final List<String> commandStrings = createCommandStrings(context);
final String commandString = StringUtils.join(commandStrings, " ");
if (longRunningProcess == null || longRunningProcess.isDone()) {
try {
longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
} catch (final IOException ioe) {
getLogger().error("Failed to create process due to {}", new Object[] { ioe });
context.yield();
return;
}
} else {
getLogger().info("Read from long running process");
}
if (!isScheduled()) {
getLogger().info("User stopped processor; will terminate process immediately");
longRunningProcess.cancel(true);
return;
}
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
// or until the specified amount of time
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream flowFileOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
proxyOut.setDelegate(out);
if (batchNanos == null) {
// we are not creating batches; wait until process terminates.
// NB!!! Maybe get(long timeout, TimeUnit unit) should
// be used to avoid waiting forever.
try {
longRunningProcess.get();
} catch (final InterruptedException ie) {
} catch (final ExecutionException ee) {
getLogger().error("Process execution failed due to {}", new Object[] { ee.getCause() });
}
} else {
// wait the allotted amount of time.
try {
TimeUnit.NANOSECONDS.sleep(batchNanos);
} catch (final InterruptedException ie) {
}
}
proxyOut.setDelegate(null); // prevent from writing to this
// stream
}
}
});
if (flowFile.getSize() == 0L) {
// If no data was written to the file, remove it
session.remove(flowFile);
} else if (failure.get()) {
// If there was a failure processing the output of the Process, remove the FlowFile
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
} else {
// All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
getLogger().info("Created {} and routed to success", new Object[] { flowFile });
session.transfer(flowFile, REL_SUCCESS);
}
// Commit the session so that the FlowFile is transferred to the next processor
session.commit();
}
protected List<String> createCommandStrings(final ProcessContext context) {
final String command = context.getProperty(COMMAND).getValue(); final String command = context.getProperty(COMMAND).getValue();
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue()); final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
final List<String> commandStrings = new ArrayList<>(args.size() + 1); final List<String> commandStrings = new ArrayList<>(args.size() + 1);
commandStrings.add(command); commandStrings.add(command);
commandStrings.addAll(args); commandStrings.addAll(args);
return commandStrings;
}
final String commandString = StringUtils.join(commandStrings, " "); protected Future<?> launchProcess(final ProcessContext context, final List<String> commandStrings, final Long batchNanos,
final ProxyOutputStream proxyOut) throws IOException {
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
final ProcessBuilder builder = new ProcessBuilder(commandStrings); final ProcessBuilder builder = new ProcessBuilder(commandStrings);
final String workingDirName = context.getProperty(WORKING_DIR).getValue(); final String workingDirName = context.getProperty(WORKING_DIR).getValue();
@ -236,24 +323,15 @@ public class ExecuteProcess extends AbstractProcessor {
builder.environment().putAll(environment); builder.environment().putAll(environment);
} }
final long startNanos = System.nanoTime(); getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings });
final Process process; final Process newProcess = builder.redirectErrorStream(redirectErrorStream).start();
try {
process = builder.redirectErrorStream(redirectErrorStream).start();
} catch (final IOException ioe) {
getLogger().error("Failed to create process due to {}", new Object[]{ioe});
context.yield();
return;
}
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
// Submit task to read error stream from process // Submit task to read error stream from process
if (!redirectErrorStream) { if (!redirectErrorStream) {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getErrorStream()))) {
while (reader.read() >= 0) { while (reader.read() >= 0) {
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -263,19 +341,25 @@ public class ExecuteProcess extends AbstractProcessor {
} }
// Submit task to read output of Process and write to FlowFile. // Submit task to read output of Process and write to FlowFile.
final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger()); failure = new AtomicBoolean(false);
final AtomicBoolean failure = new AtomicBoolean(false);
final AtomicBoolean finishedCopying = new AtomicBoolean(false);
final Future<?> future = executor.submit(new Callable<Object>() { final Future<?> future = executor.submit(new Callable<Object>() {
@Override @Override
public Object call() throws IOException { public Object call() throws IOException {
try { try {
if (batchNanos == null) { if (batchNanos == null) {
// if we aren't batching, just copy the stream from the process to the flowfile. // if we aren't batching, just copy the stream from the
try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) { // process to the flowfile.
try (final BufferedInputStream bufferedIn = new BufferedInputStream(newProcess.getInputStream())) {
final byte[] buffer = new byte[4096]; final byte[] buffer = new byte[4096];
int len; int len;
while ((len = bufferedIn.read(buffer)) > 0) { while ((len = bufferedIn.read(buffer)) > 0) {
// NB!!!! Maybe all data should be read from
// input stream in case of !isScheduled() to
// avoid subprocess deadlock?
// (we just don't write data to proxyOut)
// Or because we don't use this subprocess
// anymore anyway, we don't care?
if (!isScheduled()) { if (!isScheduled()) {
return null; return null;
} }
@ -284,12 +368,15 @@ public class ExecuteProcess extends AbstractProcessor {
} }
} }
} else { } else {
// we are batching, which means that the output of the process is text. It doesn't make sense to grab // we are batching, which means that the output of the
// arbitrary batches of bytes from some process and send it along as a piece of data, so we assume that // process is text. It doesn't make sense to grab
// arbitrary batches of bytes from some process and send
// it along as a piece of data, so we assume that
// setting a batch during means text. // setting a batch during means text.
// Also, we don't want that text to get split up in the middle of a line, so we use BufferedReader // Also, we don't want that text to get split up in the
// middle of a line, so we use BufferedReader
// to read lines of text and write them as lines of text. // to read lines of text and write them as lines of text.
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) {
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
@ -305,108 +392,22 @@ public class ExecuteProcess extends AbstractProcessor {
failure.set(true); failure.set(true);
throw ioe; throw ioe;
} finally { } finally {
finishedCopying.set(true); int exitCode;
try {
exitCode = newProcess.exitValue();
} catch (final Exception e) {
exitCode = -99999;
}
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
} }
return null; return null;
} }
}); });
// continue to do this loop until both the process has finished and we have finished copying return future;
// the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(),
// there can be data buffered on the InputStream; so we will wait until the stream is empty as well.
int flowFileCount = 0;
while (!finishedCopying.get() || isAlive(process)) {
if (!isScheduled()) {
getLogger().info("User stopped processor; will terminate process immediately");
process.destroy();
break;
}
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
// or until the specified amount of time
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream flowFileOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
proxyOut.setDelegate(out);
if (batchNanos == null) {
// we are not creating batches; wait until process terminates.
Integer exitCode = null;
while (exitCode == null) {
try {
exitCode = process.waitFor();
} catch (final InterruptedException ie) {
}
}
} else {
// wait the allotted amount of time.
try {
TimeUnit.NANOSECONDS.sleep(batchNanos);
} catch (final InterruptedException ie) {
}
}
proxyOut.setDelegate(null); // prevent from writing to this stream
}
}
});
if (flowFile.getSize() == 0L) {
// If no data was written to the file, remove it
session.remove(flowFile);
} else if (failure.get()) {
// If there was a failure processing the output of the Process, remove the FlowFile
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
break;
} else {
// All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
getLogger().info("Created {} and routed to success", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
flowFileCount++;
}
// Commit the session so that the FlowFile is transferred to the next processor
session.commit();
}
final int exitCode;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
try {
exitCode = process.waitFor();
} catch (final InterruptedException ie) {
getLogger().warn("Process was interrupted before finishing");
return;
}
try {
future.get();
} catch (final ExecutionException e) {
getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[]{e.getCause()});
} catch (final InterruptedException ie) {
getLogger().error("Interrupted while waiting to copy data form Process to FlowFile");
return;
}
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis});
} }
private boolean isAlive(final Process process) {
// unfortunately, java provides no straight-forward way to test if a Process is alive.
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
// so we have this solution in the mean time.
try {
process.exitValue();
return false;
} catch (final IllegalThreadStateException itse) {
return true;
}
}
/** /**
* Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed * Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed

View File

@ -20,11 +20,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.List; import java.util.List;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class TestExecuteProcess { public class TestExecuteProcess {
@ -58,6 +61,7 @@ public class TestExecuteProcess {
assertEquals("good bye", twoQuotedArg.get(1)); assertEquals("good bye", twoQuotedArg.get(1));
} }
@Ignore // won't run under Windows
@Test @Test
public void testEcho() { public void testEcho() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
@ -75,4 +79,82 @@ public class TestExecuteProcess {
System.out.println(new String(flowFile.toByteArray())); System.out.println(new String(flowFile.toByteArray()));
} }
} }
// @Test
public void testBigBinaryInputData() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
String workingDirName = "/var/test";
String testFile = "eclipse-java-luna-SR2-win32.zip";
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
runner.setProperty(ExecuteProcess.COMMAND, "cmd");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, " /c type " + testFile);
runner.setProperty(ExecuteProcess.WORKING_DIR, workingDirName);
File inFile = new File(workingDirName, testFile);
System.out.println(inFile.getAbsolutePath());
runner.run();
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
long totalFlowFilesSize = 0;
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
totalFlowFilesSize += flowFile.getSize();
// System.out.println(new String(flowFile.toByteArray()));
}
assertEquals(inFile.length(), totalFlowFilesSize);
}
@Test
public void testBigInputSplit() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
String workingDirName = "/var/test";
String testFile = "Novo_dicionário_da_língua_portuguesa_by_Cândido_de_Figueiredo.txt";
// String testFile = "eclipse-java-luna-SR2-win32.zip";
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
runner.setProperty(ExecuteProcess.COMMAND, "cmd");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, " /c type " + testFile);
runner.setProperty(ExecuteProcess.WORKING_DIR, workingDirName);
runner.setProperty(ExecuteProcess.BATCH_DURATION, "150 millis");
File inFile = new File(workingDirName, testFile);
System.out.println(inFile.getAbsolutePath());
// runner.run(1,false,true);
ProcessContext processContext = runner.getProcessContext();
ExecuteProcess processor = (ExecuteProcess) runner.getProcessor();
processor.updateScheduledTrue();
processor.setupExecutor(processContext);
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
processor.onTrigger(processContext, runner.getProcessSessionFactory());
// runner.run(5,true,false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
long totalFlowFilesSize = 0;
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
totalFlowFilesSize += flowFile.getSize();
// System.out.println(new String(flowFile.toByteArray()));
}
// assertEquals(inFile.length(), totalFlowFilesSize);
}
} }