diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java index 8294af39f8..d1ed023888 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java @@ -31,6 +31,11 @@ public class ByteCountingInputStream extends InputStream { this.in = in; } + public ByteCountingInputStream(final InputStream in, final long initialOffset) { + this.in = in; + this.bytesSkipped = initialOffset; + } + @Override public int read() throws IOException { final int fromSuper = in.read(); diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java index 3e3e3fe96d..e71937ecc4 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java @@ -27,6 +27,12 @@ public class ByteCountingOutputStream extends OutputStream { public ByteCountingOutputStream(final OutputStream out) { this.out = out; } + + public ByteCountingOutputStream(final OutputStream out, final long initialByteCount) { + this.out = out; + this.bytesWritten = initialByteCount; + } + @Override public void write(int b) throws IOException { @@ -39,6 +45,8 @@ public class ByteCountingOutputStream extends OutputStream { write(b, 0, b.length); } + ; + @Override public void write(byte[] b, int off, int len) throws IOException { out.write(b, off, len); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index a5054e4102..25c396fd75 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -131,6 +131,7 @@ src/test/resources/conf/0bytes.xml src/test/resources/conf/termination-only.xml src/test/resources/hello.txt + src/test/resources/old-swap-file.swap diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 0502cc7ef4..48cc164e7e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -58,6 +57,14 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.FSDirectory; import org.apache.nifi.events.EventReporter; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.expiration.ExpirationAction; @@ -67,12 +74,11 @@ import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageComputationType; import org.apache.nifi.provenance.lucene.DeleteIndexAction; import org.apache.nifi.provenance.lucene.FieldNames; +import org.apache.nifi.provenance.lucene.IndexManager; import org.apache.nifi.provenance.lucene.IndexSearch; import org.apache.nifi.provenance.lucene.IndexingAction; import org.apache.nifi.provenance.lucene.LineageQuery; import org.apache.nifi.provenance.lucene.LuceneUtil; -import org.apache.nifi.provenance.rollover.CompressionAction; -import org.apache.nifi.provenance.rollover.RolloverAction; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; import org.apache.nifi.provenance.search.QuerySubmission; @@ -81,18 +87,12 @@ import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriters; +import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.StopWatch; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +102,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public static final String EVENT_CATEGORY = "Provenance Repository"; private static final String FILE_EXTENSION = ".prov"; private static final String TEMP_FILE_SUFFIX = ".prov.part"; - public static final int SERIALIZATION_VERSION = 7; + public static final int SERIALIZATION_VERSION = 8; public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+"); public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov"); @@ -129,14 +129,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicLong streamStartTime = new AtomicLong(System.currentTimeMillis()); private final RepositoryConfiguration configuration; private final IndexConfiguration indexConfig; + private final IndexManager indexManager; private final boolean alwaysSync; private final int rolloverCheckMillis; private final ScheduledExecutorService scheduledExecService; - private final ExecutorService rolloverExecutor; + private final ScheduledExecutorService rolloverExecutor; private final ExecutorService queryExecService; - private final List rolloverActions = new ArrayList<>(); private final List expirationActions = new ArrayList<>(); private final IndexingAction indexingAction; @@ -181,22 +181,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS); this.maxPartitionBytes = configuration.getMaxEventFileCapacity(); this.indexConfig = new IndexConfiguration(configuration); + this.indexManager = new IndexManager(); this.alwaysSync = configuration.isAlwaysSync(); this.rolloverCheckMillis = rolloverCheckMillis; final List fields = configuration.getSearchableFields(); if (fields != null && !fields.isEmpty()) { indexingAction = new IndexingAction(this, indexConfig); - rolloverActions.add(indexingAction); } else { indexingAction = null; } - if (configuration.isCompressOnRollover()) { - rolloverActions.add(new CompressionAction()); - } - - scheduledExecService = Executors.newScheduledThreadPool(3); + scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread")); queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread")); // The number of rollover threads is a little bit arbitrary but comes from the idea that multiple storage directories generally @@ -204,69 +200,74 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // disks efficiently. However, the rollover actions can be somewhat CPU intensive, so we double the number of threads in order // to account for that. final int numRolloverThreads = configuration.getStorageDirectories().size() * 2; - rolloverExecutor = Executors.newFixedThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread")); + rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread")); } @Override public void initialize(final EventReporter eventReporter) throws IOException { - if (initialized.getAndSet(true)) { - return; - } - - this.eventReporter = eventReporter; - - recover(); - - if (configuration.isAllowRollover()) { - writers = createWriters(configuration, idGenerator.get()); - } - - if (configuration.isAllowRollover()) { - scheduledExecService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - // Check if we need to roll over - if (needToRollover()) { - // it appears that we do need to roll over. Obtain write lock so that we can do so, and then - // confirm that we still need to. - writeLock.lock(); - try { - logger.debug("Obtained write lock to perform periodic rollover"); - - if (needToRollover()) { - try { - rollover(false); - } catch (final Exception e) { - logger.error("Failed to roll over Provenance Event Log due to {}", e.toString()); - logger.error("", e); - } - } - } finally { - writeLock.unlock(); - } - } - } - }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); - - scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); - scheduledExecService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - purgeOldEvents(); - } catch (final Exception e) { - logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString()); - } - } - }, 1L, 1L, TimeUnit.MINUTES); - - expirationActions.add(new DeleteIndexAction(this, indexConfig)); - expirationActions.add(new FileRemovalAction()); - } + writeLock.lock(); + try { + if (initialized.getAndSet(true)) { + return; + } + + this.eventReporter = eventReporter; + + recover(); + + if (configuration.isAllowRollover()) { + writers = createWriters(configuration, idGenerator.get()); + } + + if (configuration.isAllowRollover()) { + scheduledExecService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + // Check if we need to roll over + if (needToRollover()) { + // it appears that we do need to roll over. Obtain write lock so that we can do so, and then + // confirm that we still need to. + writeLock.lock(); + try { + logger.debug("Obtained write lock to perform periodic rollover"); + + if (needToRollover()) { + try { + rollover(false); + } catch (final Exception e) { + logger.error("Failed to roll over Provenance Event Log due to {}", e.toString()); + logger.error("", e); + } + } + } finally { + writeLock.unlock(); + } + } + } + }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); + + scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); + scheduledExecService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + purgeOldEvents(); + } catch (final Exception e) { + logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString()); + } + } + }, 1L, 1L, TimeUnit.MINUTES); + + expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); + expirationActions.add(new FileRemovalAction()); + } + } finally { + writeLock.unlock(); + } } private static RepositoryConfiguration createRepositoryConfiguration() throws IOException { @@ -334,10 +335,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File journalDirectory = new File(storageDirectory, "journals"); final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i); - writers[i] = RecordWriters.newRecordWriter(journalFile); + writers[i] = RecordWriters.newRecordWriter(journalFile, false, false); writers[i].writeHeader(); } + logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId); return writers; } @@ -501,18 +503,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Determine the max ID in the last file. try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) { - ProvenanceEventRecord record; - while ((record = reader.nextRecord()) != null) { - final long eventId = record.getEventId(); - if (eventId > maxId) { - maxId = eventId; - } + final long eventId = reader.getMaxEventId(); + if (eventId > maxId) { + maxId = eventId; + } - // If the ID is greater than the max indexed id and this file was indexed, then - // update the max indexed id - if (eventId > maxIndexedId && lastFileIndexed) { - maxIndexedId = eventId; - } + // If the ID is greater than the max indexed id and this file was indexed, then + // update the max indexed id + if (eventId > maxIndexedId && lastFileIndexed) { + maxIndexedId = eventId; } } catch (final IOException ioe) { logger.error("Failed to read Provenance Event File {} due to {}", maxIdFile, ioe); @@ -568,16 +567,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Read the records in the last file to find its max id if (greatestMinIdFile != null) { try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.emptyList())) { - StandardProvenanceEventRecord record; - - try { - while ((record = recordReader.nextRecord()) != null) { - if (record.getEventId() > maxId) { - maxId = record.getEventId(); - } - } - } catch (final EOFException eof) { - } + maxId = recordReader.getMaxEventId(); } } @@ -599,46 +589,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } logger.info("Recovered {} records", recordsRecovered); - - final List rolloverActions = this.rolloverActions; - final Runnable retroactiveRollover = new Runnable() { - @Override - public void run() { - for (File toRecover : filesToRecover) { - final String baseFileName = LuceneUtil.substringBefore(toRecover.getName(), "."); - final Long fileFirstEventId = Long.parseLong(baseFileName); - - for (final RolloverAction action : rolloverActions) { - if (!action.hasBeenPerformed(toRecover)) { - try { - final StopWatch stopWatch = new StopWatch(true); - - toRecover = action.execute(toRecover); - - stopWatch.stop(); - final String duration = stopWatch.getDuration(); - logger.info("Successfully performed retroactive action {} against {} in {}", action, toRecover, duration); - - // update our map of id to Path - final Map updatedMap = addToPathMap(fileFirstEventId, toRecover.toPath()); - logger.trace("After retroactive rollover action {}, Path Map: {}", action, updatedMap); - } catch (final Exception e) { - logger.error("Failed to perform retroactive rollover actions on {} due to {}", toRecover, e.toString()); - logger.error("", e); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to perform retroactive rollover actions on " + toRecover + " due to " + e.toString()); - } - } - } - } - } - }; - rolloverExecutor.submit(retroactiveRollover); - recoveryFinished.set(true); } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { writeLock.lock(); try { logger.debug("Obtained write lock for close"); @@ -648,8 +603,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository rolloverExecutor.shutdownNow(); queryExecService.shutdownNow(); - for (final RecordWriter writer : writers) { - writer.close(); + indexManager.close(); + + if ( writers != null ) { + for (final RecordWriter writer : writers) { + writer.close(); + } } } finally { writeLock.unlock(); @@ -945,6 +904,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } + // made protected for testing purposes + protected int getJournalCount() { + // determine how many 'journals' we have in the journals directories + int journalFileCount = 0; + for ( final File storageDir : configuration.getStorageDirectories() ) { + final File journalsDir = new File(storageDir, "journals"); + final File[] journalFiles = journalsDir.listFiles(); + if ( journalFiles != null ) { + journalFileCount += journalFiles.length; + } + } + + return journalFileCount; + } + /** * MUST be called with the write lock held * @@ -963,9 +937,45 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final RecordWriter writer : writers) { final File writerFile = writer.getFile(); journalsToMerge.add(writerFile); - writer.close(); + try { + writer.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", writer, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + if ( logger.isDebugEnabled() ) { + logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } + int journalFileCount = getJournalCount(); + final int journalCountThreshold = configuration.getJournalCount() * 5; + if ( journalFileCount > journalCountThreshold ) { + logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " + + "Slowing down flow to accomodate. Currently, there are {} journal files and " + + "threshold for blocking is {}", journalFileCount, journalCountThreshold); + eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " + + "exceeding the provenance recording rate. Slowing down flow to accomodate"); + + while (journalFileCount > journalCountThreshold) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + + logger.debug("Provenance Repository is still behind. Keeping flow slowed down " + + "to accomodate. Currently, there are {} journal files and " + + "threshold for blocking is {}", journalFileCount, journalCountThreshold); + + journalFileCount = getJournalCount(); + } + + logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of " + + "journal files to be rolled over is {}", journalFileCount); + } + writers = createWriters(configuration, idGenerator.get()); streamStartTime.set(System.currentTimeMillis()); recordsWrittenSinceRollover.getAndSet(0); @@ -974,60 +984,29 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final List storageDirs = configuration.getStorageDirectories(); final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size())); - final List actions = rolloverActions; + final AtomicReference> futureReference = new AtomicReference<>(); final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0); final Runnable rolloverRunnable = new Runnable() { @Override public void run() { - final File fileRolledOver; - - try { - fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); - repoDirty.set(false); - } catch (final IOException ioe) { - repoDirty.set(true); - logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); - logger.error("", ioe); - return; - } - - if (fileRolledOver == null) { - return; - } - File file = fileRolledOver; - - for (final RolloverAction action : actions) { - try { - final StopWatch stopWatch = new StopWatch(true); - file = action.execute(file); - stopWatch.stop(); - logger.info("Successfully performed Rollover Action {} for {} in {}", action, file, stopWatch.getDuration()); - - // update our map of id to Path - // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a - // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying - // it at one time - writeLock.lock(); - try { - final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); - SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator()); - newIdToPathMap.putAll(idToPathMap.get()); - newIdToPathMap.put(fileFirstEventId, file.toPath()); - idToPathMap.set(newIdToPathMap); - logger.trace("After rollover action {}, path map: {}", action, newIdToPathMap); - } finally { - writeLock.unlock(); - } - } catch (final Throwable t) { - logger.error("Failed to perform Rollover Action {} for {}: got Exception {}", - action, fileRolledOver, t.toString()); - logger.error("", t); - - return; - } - } - - if (actions.isEmpty()) { + try { + final File fileRolledOver; + + try { + fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); + repoDirty.set(false); + } catch (final IOException ioe) { + repoDirty.set(true); + logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); + logger.error("", ioe); + return; + } + + if (fileRolledOver == null) { + return; + } + File file = fileRolledOver; + // update our map of id to Path // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying @@ -1042,35 +1021,37 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } finally { writeLock.unlock(); } - } - - logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); - rolloverCompletions.getAndIncrement(); + + logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); + rolloverCompletions.getAndIncrement(); + + // We have finished successfully. Cancel the future so that we don't run anymore + Future future; + while ((future = futureReference.get()) == null) { + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + } + } + + future.cancel(false); + } catch (final Throwable t) { + logger.error("Failed to rollover Provenance repository due to {}", t.toString()); + logger.error("", t); + } } }; - rolloverExecutor.submit(rolloverRunnable); + // We are going to schedule the future to run every 10 seconds. This allows us to keep retrying if we + // fail for some reason. When we succeed, the Runnable will cancel itself. + final Future future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); + futureReference.set(future); streamStartTime.set(System.currentTimeMillis()); bytesWrittenSinceRollover.set(0); } } - private SortedMap addToPathMap(final Long firstEventId, final Path path) { - SortedMap unmodifiableMap; - boolean updated = false; - do { - final SortedMap existingMap = idToPathMap.get(); - final SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator()); - newIdToPathMap.putAll(existingMap); - newIdToPathMap.put(firstEventId, path); - unmodifiableMap = Collections.unmodifiableSortedMap(newIdToPathMap); - - updated = idToPathMap.compareAndSet(existingMap, unmodifiableMap); - } while (!updated); - - return unmodifiableMap; - } private Set recoverJournalFiles() throws IOException { if (!configuration.isAllowRollover()) { @@ -1093,6 +1074,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } for (final File journalFile : journalFiles) { + if ( journalFile.isDirectory() ) { + continue; + } + final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); List files = journalMap.get(basename); if (files == null) { @@ -1135,22 +1120,92 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return mergedFile; } - static File mergeJournals(final List journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer ringBuffer) throws IOException { - final long startNanos = System.nanoTime(); + File mergeJournals(final List journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer ringBuffer) throws IOException { + logger.debug("Merging {} to {}", journalFiles, mergedFile); + if ( this.closed ) { + logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile); + return null; + } + if (journalFiles.isEmpty()) { return null; } - if (mergedFile.exists()) { - throw new FileAlreadyExistsException("Cannot Merge " + journalFiles.size() + " Journal Files into Merged Provenance Log File " + mergedFile.getAbsolutePath() + " because the Merged File already exists"); - } + Collections.sort(journalFiles, new Comparator() { + @Override + public int compare(final File o1, final File o2) { + final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), "."); + final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), "."); - final File tempMergedFile = new File(mergedFile.getParentFile(), mergedFile.getName() + ".part"); + try { + final int journalIndex1 = Integer.parseInt(suffix1); + final int journalIndex2 = Integer.parseInt(suffix2); + return Integer.compare(journalIndex1, journalIndex2); + } catch (final NumberFormatException nfe) { + return o1.getName().compareTo(o2.getName()); + } + } + }); + + final String firstJournalFile = journalFiles.get(0).getName(); + final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); + final boolean allPartialFiles = firstFileSuffix.equals("0"); + + // check if we have all of the "partial" files for the journal. + if (allPartialFiles) { + if ( mergedFile.exists() ) { + // we have all "partial" files and there is already a merged file. Delete the data from the index + // because the merge file may not be fully merged. We will re-merge. + logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " + + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); + + final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); + try { + deleteAction.execute(mergedFile); + } catch (final Exception e) { + logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + } + + // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on + // a different Storage Directory than the original, we need to ensure that we delete both the partially merged + // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. + if ( !mergedFile.delete() ) { + logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " + + "file not being able to be displayed. This file should be deleted manually.", mergedFile); + } + + final File tocFile = TocUtil.getTocFile(mergedFile); + if ( tocFile.exists() && !tocFile.delete() ) { + logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " + + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile); + } + } + } else { + logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " + + "but it did not; assuming that the files were already merged but only some finished deletion " + + "before restart. Deleting remaining partial journal files.", journalFiles); + + for ( final File file : journalFiles ) { + if ( !file.delete() && file.exists() ) { + logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); + } + } + + return null; + } + + final long startNanos = System.nanoTime(); // Map each journal to a RecordReader final List readers = new ArrayList<>(); int records = 0; + final boolean isCompress = configuration.isCompressOnRollover(); + final File writerFile = isCompress ? new File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile; + try { for (final File journalFile : journalFiles) { try { @@ -1203,32 +1258,50 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // loop over each entry in the map, persisting the records to the merged file in order, and populating the map // with the next entry from the journal file from which the previous record was written. - try (final RecordWriter writer = RecordWriters.newRecordWriter(tempMergedFile)) { + try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { writer.writeHeader(); - while (!recordToReaderMap.isEmpty()) { - final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); - - writer.writeRecord(record, record.getEventId()); - ringBuffer.add(record); - records++; - - // Remove this entry from the map - recordToReaderMap.remove(record); - - // Get the next entry from this reader and add it to the map - StandardProvenanceEventRecord nextRecord = null; - - try { - nextRecord = reader.nextRecord(); - } catch (final EOFException eof) { - } - - if (nextRecord != null) { - recordToReaderMap.put(nextRecord, reader); - } + final IndexingAction indexingAction = new IndexingAction(this, indexConfig); + + final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile); + final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); + try { + long maxId = 0L; + + while (!recordToReaderMap.isEmpty()) { + final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); + final StandardProvenanceEventRecord record = entry.getKey(); + final RecordReader reader = entry.getValue(); + + writer.writeRecord(record, record.getEventId()); + final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); + + indexingAction.index(record, indexWriter, blockIndex); + maxId = record.getEventId(); + + ringBuffer.add(record); + records++; + + // Remove this entry from the map + recordToReaderMap.remove(record); + + // Get the next entry from this reader and add it to the map + StandardProvenanceEventRecord nextRecord = null; + + try { + nextRecord = reader.nextRecord(); + } catch (final EOFException eof) { + } + + if (nextRecord != null) { + recordToReaderMap.put(nextRecord, reader); + } + } + + indexWriter.commit(); + indexConfig.setMaxIdIndexed(maxId); + } finally { + indexManager.returnIndexWriter(indexingDirectory, indexWriter); } } } finally { @@ -1240,37 +1313,22 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } - // Attempt to rename. Keep trying for a bit if we fail. This happens often if we have some external process - // that locks files, such as a virus scanner. - boolean renamed = false; - for (int i = 0; i < 10 && !renamed; i++) { - renamed = tempMergedFile.renameTo(mergedFile); - if (!renamed) { - try { - Thread.sleep(100L); - } catch (final InterruptedException ie) { - } - } - } - - if (!renamed) { - throw new IOException("Failed to merge journal files into single merged file " + mergedFile.getAbsolutePath() + " because " + tempMergedFile.getAbsolutePath() + " could not be renamed"); - } - // Success. Remove all of the journal files, as they're no longer needed, now that they've been merged. for (final File journalFile : journalFiles) { - if (!journalFile.delete()) { - if (journalFile.exists()) { - logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); - } else { - logger.warn("Failed to remove temporary journal file {} because it no longer exists", journalFile.getAbsolutePath()); - } + if (!journalFile.delete() && journalFile.exists()) { + logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); + } + + final File tocFile = TocUtil.getTocFile(journalFile); + if (!tocFile.delete() && tocFile.exists()) { + logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath()); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); } } if (records == 0) { - mergedFile.delete(); + writerFile.delete(); return null; } else { final long nanos = System.nanoTime() - startNanos; @@ -1278,7 +1336,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, mergedFile, millis); } - return mergedFile; + return writerFile; } @Override @@ -1779,7 +1837,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public void run() { try { - final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir); + final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager); final StandardQueryResult queryResult = search.search(query, retrievalCount); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); if (queryResult.isFinished()) { @@ -1787,7 +1845,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount()); } } catch (final Throwable t) { - logger.error("Failed to query provenance repository due to {}", t.toString()); + logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString()); if (logger.isDebugEnabled()) { logger.error("", t); } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index d47df4f4b1..3951591d2d 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -33,7 +33,8 @@ public class RepositoryConfiguration { private long eventFileBytes = 1024L * 1024L * 5L; // 5 MB private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int journalCount = 16; - + private int compressionBlockBytes = 1024 * 1024; + private List searchableFields = new ArrayList<>(); private List searchableAttributes = new ArrayList<>(); private boolean compress = true; @@ -49,7 +50,16 @@ public class RepositoryConfiguration { return allowRollover; } - /** + + public int getCompressionBlockBytes() { + return compressionBlockBytes; + } + + public void setCompressionBlockBytes(int compressionBlockBytes) { + this.compressionBlockBytes = compressionBlockBytes; + } + + /** * Specifies where the repository will store data * * @return diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java index 5e4744bdbf..9bbf1952db 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java @@ -17,41 +17,173 @@ package org.apache.nifi.provenance; import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.zip.GZIPInputStream; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StandardRecordReader implements RecordReader { - - private final DataInputStream dis; - private final ByteCountingInputStream byteCountingIn; + private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class); + + private final ByteCountingInputStream rawInputStream; private final String filename; private final int serializationVersion; + private final boolean compressed; + private final TocReader tocReader; + private final int headerLength; + + private DataInputStream dis; + private ByteCountingInputStream byteCountingIn; - public StandardRecordReader(final InputStream in, final int serializationVersion, final String filename) { - if (serializationVersion < 1 || serializationVersion > 7) { - throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-6"); + public StandardRecordReader(final InputStream in, final String filename) throws IOException { + this(in, filename, null); + } + + public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException { + logger.trace("Creating RecordReader for {}", filename); + + rawInputStream = new ByteCountingInputStream(in); + + final InputStream limitedStream; + if ( tocReader == null ) { + limitedStream = rawInputStream; + } else { + final long offset1 = tocReader.getBlockOffset(1); + if ( offset1 < 0 ) { + limitedStream = rawInputStream; + } else { + limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed()); + } + } + + final InputStream readableStream; + if (filename.endsWith(".gz")) { + readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream)); + compressed = true; + } else { + readableStream = new BufferedInputStream(limitedStream); + compressed = false; + } + + byteCountingIn = new ByteCountingInputStream(readableStream); + dis = new DataInputStream(byteCountingIn); + + final String repoClassName = dis.readUTF(); + final int serializationVersion = dis.readInt(); + headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer. + + if (serializationVersion < 1 || serializationVersion > 8) { + throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8"); } - byteCountingIn = new ByteCountingInputStream(in); - this.dis = new DataInputStream(byteCountingIn); this.serializationVersion = serializationVersion; this.filename = filename; + this.tocReader = tocReader; } + @Override + public void skipToBlock(final int blockIndex) throws IOException { + if ( tocReader == null ) { + throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log"); + } + + if ( blockIndex < 0 ) { + throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative"); + } + + if ( blockIndex == getBlockIndex() ) { + return; + } + + final long offset = tocReader.getBlockOffset(blockIndex); + if ( offset < 0 ) { + throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename); + } + + final long curOffset = rawInputStream.getBytesConsumed(); + + final long bytesToSkip = offset - curOffset; + if ( bytesToSkip >= 0 ) { + try { + StreamUtils.skip(rawInputStream, bytesToSkip); + logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip); + } catch (final IOException e) { + throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e); + } + + resetStreamForNextBlock(); + } + } + + private void resetStreamForNextBlock() throws IOException { + final InputStream limitedStream; + if ( tocReader == null ) { + limitedStream = rawInputStream; + } else { + final long offset = tocReader.getBlockOffset(1 + getBlockIndex()); + if ( offset < 0 ) { + limitedStream = rawInputStream; + } else { + limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed()); + } + } + + final InputStream readableStream; + if (compressed) { + readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream)); + } else { + readableStream = new BufferedInputStream(limitedStream); + } + + byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed()); + dis = new DataInputStream(byteCountingIn); + } + + + @Override + public TocReader getTocReader() { + return tocReader; + } + + @Override + public boolean isBlockIndexAvailable() { + return tocReader != null; + } + + @Override + public int getBlockIndex() { + if ( tocReader == null ) { + throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename); + } + + return tocReader.getBlockIndex(rawInputStream.getBytesConsumed()); + } + + @Override + public long getBytesConsumed() { + return byteCountingIn.getBytesConsumed(); + } + private StandardProvenanceEventRecord readPreVersion6Record() throws IOException { final long startOffset = byteCountingIn.getBytesConsumed(); - if (!isData(byteCountingIn)) { + if (!isData()) { return null; } @@ -137,7 +269,7 @@ public class StandardRecordReader implements RecordReader { final long startOffset = byteCountingIn.getBytesConsumed(); - if (!isData(byteCountingIn)) { + if (!isData()) { return null; } @@ -242,9 +374,17 @@ public class StandardRecordReader implements RecordReader { } private String readUUID(final DataInputStream in) throws IOException { - final long msb = in.readLong(); - final long lsb = in.readLong(); - return new UUID(msb, lsb).toString(); + if ( serializationVersion < 8 ) { + final long msb = in.readLong(); + final long lsb = in.readLong(); + return new UUID(msb, lsb).toString(); + } else { + // before version 8, we serialized UUID's as two longs in order to + // write less data. However, in version 8 we changed to just writing + // out the string because it's extremely expensive to call UUID.fromString. + // In the end, since we generally compress, the savings in minimal anyway. + return in.readUTF(); + } } private String readNullableString(final DataInputStream in) throws IOException { @@ -272,16 +412,58 @@ public class StandardRecordReader implements RecordReader { return new String(strBytes, "UTF-8"); } - private boolean isData(final InputStream in) throws IOException { - in.mark(1); - final int nextByte = in.read(); - in.reset(); + private boolean isData() throws IOException { + byteCountingIn.mark(1); + int nextByte = byteCountingIn.read(); + byteCountingIn.reset(); + + if ( nextByte < 0 ) { + try { + resetStreamForNextBlock(); + } catch (final EOFException eof) { + return false; + } + + byteCountingIn.mark(1); + nextByte = byteCountingIn.read(); + byteCountingIn.reset(); + } + return (nextByte >= 0); } + + @Override + public long getMaxEventId() throws IOException { + if ( tocReader != null ) { + final long lastBlockOffset = tocReader.getLastBlockOffset(); + skipToBlock(tocReader.getBlockIndex(lastBlockOffset)); + } + + ProvenanceEventRecord record; + ProvenanceEventRecord lastRecord = null; + try { + while ((record = nextRecord()) != null) { + lastRecord = record; + } + } catch (final EOFException eof) { + // This can happen if we stop NIFi while the record is being written. + // This is OK, we just ignore this record. The session will not have been + // committed, so we can just process the FlowFile again. + } + + return (lastRecord == null) ? -1L : lastRecord.getEventId(); + } @Override public void close() throws IOException { + logger.trace("Closing Record Reader for {}", filename); + dis.close(); + rawInputStream.close(); + + if ( tocReader != null ) { + tocReader.close(); + } } @Override @@ -291,7 +473,10 @@ public class StandardRecordReader implements RecordReader { @Override public void skipTo(final long position) throws IOException { - final long currentPosition = byteCountingIn.getBytesConsumed(); + // we are subtracting headerLength from the number of bytes consumed because we used to + // consider the offset of the first record "0" - now we consider it whatever position it + // it really is in the stream. + final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength; if (currentPosition == position) { return; } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index df93084669..dbb2c48dc5 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -19,38 +19,54 @@ package org.apache.nifi.provenance; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; import java.util.Map; -import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.toc.TocWriter; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.DataOutputStream; -import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.stream.io.GZIPOutputStream; +import org.apache.nifi.stream.io.NonCloseableOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StandardRecordWriter implements RecordWriter { - + private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); + private final File file; - private final DataOutputStream out; - private final ByteCountingOutputStream byteCountingOut; private final FileOutputStream fos; + private final ByteCountingOutputStream rawOutStream; + private final TocWriter tocWriter; + private final boolean compressed; + private final int uncompressedBlockSize; + + private DataOutputStream out; + private ByteCountingOutputStream byteCountingOut; + private long lastBlockOffset = 0L; private int recordCount = 0; private final Lock lock = new ReentrantLock(); - public StandardRecordWriter(final File file) throws IOException { + + public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException { + logger.trace("Creating Record Writer for {}", file.getName()); + this.file = file; + this.compressed = compressed; this.fos = new FileOutputStream(file); - this.byteCountingOut = new ByteCountingOutputStream(new BufferedOutputStream(fos, 65536)); - this.out = new DataOutputStream(byteCountingOut); + rawOutStream = new ByteCountingOutputStream(fos); + this.uncompressedBlockSize = uncompressedBlockSize; + + this.tocWriter = writer; } static void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - final UUID uuidObj = UUID.fromString(uuid); - out.writeLong(uuidObj.getMostSignificantBits()); - out.writeLong(uuidObj.getLeastSignificantBits()); + out.writeUTF(uuid); } static void writeUUIDs(final DataOutputStream out, final Collection list) throws IOException { @@ -69,18 +85,67 @@ public class StandardRecordWriter implements RecordWriter { return file; } - @Override + @Override public synchronized void writeHeader() throws IOException { + lastBlockOffset = rawOutStream.getBytesWritten(); + resetWriteStream(); + out.writeUTF(PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } + + private void resetWriteStream() throws IOException { + if ( out != null ) { + out.flush(); + } + + final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten(); + + final OutputStream writableStream; + if ( compressed ) { + // because of the way that GZIPOutputStream works, we need to call close() on it in order for it + // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap + // the underlying OutputStream in a NonCloseableOutputStream + if ( out != null ) { + out.close(); + } + + if ( tocWriter != null ) { + tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + } + + writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + } else { + if ( tocWriter != null ) { + tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + } + + writableStream = new BufferedOutputStream(rawOutStream, 65536); + } + + this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); + this.out = new DataOutputStream(byteCountingOut); + } + @Override public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException { final ProvenanceEventType recordType = record.getEventType(); final long startBytes = byteCountingOut.getBytesWritten(); + // add a new block to the TOC if needed. + if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) { + lastBlockOffset = startBytes; + + if ( compressed ) { + // because of the way that GZIPOutputStream works, we need to call close() on it in order for it + // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap + // the underlying OutputStream in a NonCloseableOutputStream + resetWriteStream(); + } + } + out.writeLong(recordIdentifier); out.writeUTF(record.getEventType().name()); out.writeLong(record.getEventTime()); @@ -196,13 +261,24 @@ public class StandardRecordWriter implements RecordWriter { @Override public synchronized void close() throws IOException { + logger.trace("Closing Record Writer for {}", file.getName()); + lock(); try { - out.flush(); - out.close(); + try { + out.flush(); + out.close(); + } finally { + rawOutStream.close(); + + if ( tocWriter != null ) { + tocWriter.close(); + } + } } finally { unlock(); } + } @Override @@ -232,6 +308,14 @@ public class StandardRecordWriter implements RecordWriter { @Override public void sync() throws IOException { - fos.getFD().sync(); + if ( tocWriter != null ) { + tocWriter.sync(); + } + fos.getFD().sync(); + } + + @Override + public TocWriter getTocWriter() { + return tocWriter; } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java index 46084195cf..7db04aa647 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java @@ -16,25 +16,17 @@ */ package org.apache.nifi.provenance.lucene; -import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.List; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.Term; import org.apache.nifi.provenance.IndexConfiguration; import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.expiration.ExpirationAction; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.Term; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +35,12 @@ public class DeleteIndexAction implements ExpirationAction { private static final Logger logger = LoggerFactory.getLogger(DeleteIndexAction.class); private final PersistentProvenanceRepository repository; private final IndexConfiguration indexConfiguration; + private final IndexManager indexManager; - public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration) { + public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration, final IndexManager indexManager) { this.repository = repo; this.indexConfiguration = indexConfiguration; + this.indexManager = indexManager; } @Override @@ -55,51 +49,38 @@ public class DeleteIndexAction implements ExpirationAction { long numDeleted = 0; long maxEventId = -1L; try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) { - try { - StandardProvenanceEventRecord record; - while ((record = reader.nextRecord()) != null) { - numDeleted++; - - if (record.getEventId() > maxEventId) { - maxEventId = record.getEventId(); - } - } - } catch (final EOFException eof) { - // finished reading -- the last record was not completely written out, so it is discarded. - } - } catch (final EOFException eof) { - // no data in file. - return expiredFile; + maxEventId = reader.getMaxEventId(); + } catch (final IOException ioe) { + logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); } // remove the records from the index final List indexDirs = indexConfiguration.getIndexDirectories(expiredFile); for (final File indexingDirectory : indexDirs) { - try (final Directory directory = FSDirectory.open(indexingDirectory); - final Analyzer analyzer = new StandardAnalyzer()) { - IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); - config.setWriteLockTimeout(300000L); + final Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), ".")); - Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), ".")); + boolean deleteDir = false; + final IndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory); + try { + writer.deleteDocuments(term); + writer.commit(); + final int docsLeft = writer.numDocs(); + deleteDir = (docsLeft <= 0); + logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); + } finally { + indexManager.returnIndexWriter(indexingDirectory, writer); + } - boolean deleteDir = false; - try (final IndexWriter indexWriter = new IndexWriter(directory, config)) { - indexWriter.deleteDocuments(term); - indexWriter.commit(); - final int docsLeft = indexWriter.numDocs(); - deleteDir = (docsLeft <= 0); - logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); - } - - // we've confirmed that all documents have been removed. Delete the index directory. - if (deleteDir) { - indexConfiguration.removeIndexDirectory(indexingDirectory); - deleteDirectory(indexingDirectory); - logger.info("Removed empty index directory {}", indexingDirectory); - } + // we've confirmed that all documents have been removed. Delete the index directory. + if (deleteDir) { + indexManager.removeIndex(indexingDirectory); + indexConfiguration.removeIndexDirectory(indexingDirectory); + + deleteDirectory(indexingDirectory); + logger.info("Removed empty index directory {}", indexingDirectory); } } - + // Update the minimum index to 1 more than the max Event ID in this file. if (maxEventId > -1L) { indexConfiguration.setMinIdIndexed(maxEventId + 1L); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index 6446a35497..5a77f42b87 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -23,23 +23,30 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; - +import org.apache.nifi.provenance.toc.TocReader; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DocsReader { - + private final Logger logger = LoggerFactory.getLogger(DocsReader.class); + public DocsReader(final List storageDirectories) { } @@ -48,6 +55,7 @@ public class DocsReader { return Collections.emptySet(); } + final long start = System.nanoTime(); final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); final List docs = new ArrayList<>(numDocs); @@ -60,63 +68,102 @@ public class DocsReader { } } + final long readDocuments = System.nanoTime() - start; + logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); return read(docs, allProvenanceLogFiles); } + + private long getByteOffset(final Document d, final RecordReader reader) { + final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); + if ( blockField != null ) { + final int blockIndex = blockField.numericValue().intValue(); + final TocReader tocReader = reader.getTocReader(); + return tocReader.getBlockOffset(blockIndex); + } + + return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); + } + + + private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException { + IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); + if ( blockField == null ) { + reader.skipTo(getByteOffset(d, reader)); + } else { + reader.skipToBlock(blockField.numericValue().intValue()); + } + + StandardProvenanceEventRecord record; + while ( (record = reader.nextRecord()) != null) { + IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); + if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) { + break; + } + } + + if ( record == null ) { + throw new IOException("Failed to find Provenance Event " + d); + } else { + return record; + } + } + + public Set read(final List docs, final Collection allProvenanceLogFiles) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); RecordReader reader = null; String lastStorageFilename = null; - long lastByteOffset = 0L; final Set matchingRecords = new LinkedHashSet<>(); + final long start = System.nanoTime(); + int logFileCount = 0; + + final Set storageFilesToSkip = new HashSet<>(); + try { for (final Document d : docs) { final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue(); - final long byteOffset = d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); - + if ( storageFilesToSkip.contains(storageFilename) ) { + continue; + } + try { - if (reader != null && storageFilename.equals(lastStorageFilename) && byteOffset > lastByteOffset) { - // Still the same file and the offset is downstream. - try { - reader.skipTo(byteOffset); - final StandardProvenanceEventRecord record = reader.nextRecord(); - matchingRecords.add(record); - } catch (final IOException e) { - throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); - } - + if (reader != null && storageFilename.equals(lastStorageFilename)) { + matchingRecords.add(getRecord(d, reader)); } else { + logger.debug("Opening log file {}", storageFilename); + + logFileCount++; if (reader != null) { reader.close(); } List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { - throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); + logger.warn("Could not find Provenance Log File with basename {} in the " + + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); + storageFilesToSkip.add(storageFilename); + continue; } if (potentialFiles.size() > 1) { - throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + storageFilename + " in the Provenance Repository"); + throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + + storageFilename + " in the Provenance Repository"); } for (final File file : potentialFiles) { - reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); - try { - reader.skip(byteOffset); - - final StandardProvenanceEventRecord record = reader.nextRecord(); - matchingRecords.add(record); + reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); + matchingRecords.add(getRecord(d, reader)); } catch (final IOException e) { - throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e); + throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e); } } } } finally { lastStorageFilename = storageFilename; - lastByteOffset = byteOffset; } } } finally { @@ -125,6 +172,9 @@ public class DocsReader { } } + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount); + return matchingRecords; } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java index 6afc193d6a..90a73f463f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java @@ -20,4 +20,5 @@ public class FieldNames { public static final String STORAGE_FILENAME = "storage-filename"; public static final String STORAGE_FILE_OFFSET = "storage-fileOffset"; + public static final String BLOCK_INDEX = "block-index"; } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java new file mode 100644 index 0000000000..394350476f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lucene; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexManager implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(IndexManager.class); + + private final Lock lock = new ReentrantLock(); + private final Map writerCounts = new HashMap<>(); + private final Map> activeSearchers = new HashMap<>(); + + + public void removeIndex(final File indexDirectory) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.info("Removing index {}", indexDirectory); + + lock.lock(); + try { + final IndexWriterCount count = writerCounts.remove(absoluteFile); + if ( count != null ) { + try { + count.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + for ( final List searcherList : activeSearchers.values() ) { + for ( final ActiveIndexSearcher searcher : searcherList ) { + try { + searcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher {} for {} due to {}", + searcher.getSearcher(), absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } finally { + lock.unlock(); + } + } + + public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Borrowing index writer for {}", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final List closeables = new ArrayList<>(); + final Directory directory = FSDirectory.open(indexingDirectory); + closeables.add(directory); + + try { + final Analyzer analyzer = new StandardAnalyzer(); + closeables.add(analyzer); + + final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); + config.setWriteLockTimeout(300000L); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); + logger.debug("Providing new index writer for {}", indexingDirectory); + } catch (final IOException ioe) { + for ( final Closeable closeable : closeables ) { + try { + closeable.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); + } + } + + throw ioe; + } + + writerCounts.put(absoluteFile, writerCount); + } else { + logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + } + + return writerCount.getWriter(); + } finally { + lock.unlock(); + } + } + + public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount count = writerCounts.remove(absoluteFile); + + try { + if ( count == null ) { + logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " + + "This could potentially lead to a resource leak", writer, indexingDirectory); + writer.close(); + } else if ( count.getCount() <= 1 ) { + // we are finished with this writer. + logger.debug("Closing Index Writer for {}", indexingDirectory); + count.close(); + } else { + // decrement the count. + logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); + } + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } finally { + lock.unlock(); + } + } + + + public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { + final File absoluteFile = indexDir.getAbsoluteFile(); + logger.debug("Borrowing index searcher for {}", indexDir); + + lock.lock(); + try { + // check if we already have a reader cached. + List currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + currentlyCached = new ArrayList<>(); + activeSearchers.put(absoluteFile, currentlyCached); + } else { + // keep track of any searchers that have been closed so that we can remove them + // from our cache later. + final Set expired = new HashSet<>(); + + try { + for ( final ActiveIndexSearcher searcher : currentlyCached ) { + if ( searcher.isCache() ) { + final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); + if ( refCount <= 0 ) { + // if refCount == 0, then the reader has been closed, so we need to discard the searcher + logger.debug("Reference count for cached Index Searcher for {} is currently {}; " + + "removing cached searcher", absoluteFile, refCount); + expired.add(searcher); + continue; + } + + logger.debug("Providing previously cached index searcher for {}", indexDir); + return searcher.getSearcher(); + } + } + } finally { + // if we have any expired index searchers, we need to close them and remove them + // from the cache so that we don't try to use them again later. + for ( final ActiveIndexSearcher searcher : expired ) { + try { + searcher.close(); + } catch (final Exception e) { + logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); + } + + currentlyCached.remove(searcher); + } + } + } + + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final Directory directory = FSDirectory.open(absoluteFile); + logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); + + try { + final DirectoryReader directoryReader = DirectoryReader.open(directory); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we want to cache the searcher that we create, since it's just a reader. + final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); + currentlyCached.add(cached); + + return cached.getSearcher(); + } catch (final IOException e) { + try { + directory.close(); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } + + throw e; + } + } else { + logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " + + "counter to {}", indexDir, writerCount.getCount() + 1); + + // increment the writer count to ensure that it's kept open. + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + + // create a new Index Searcher from the writer so that we don't have an issue with trying + // to read from a directory that's locked. If we get the "no segments* file found" with + // Lucene, this indicates that an IndexWriter already has the directory open. + final IndexWriter writer = writerCount.getWriter(); + final DirectoryReader directoryReader = DirectoryReader.open(writer, false); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we don't want to cache this searcher because it's based on a writer, so we want to get + // new values the next time that we search. + final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); + + currentlyCached.add(activeSearcher); + return activeSearcher.getSearcher(); + } + } finally { + lock.unlock(); + } + } + + + public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); + + lock.lock(); + try { + // check if we already have a reader cached. + List currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " + + "result in a resource leak", indexDirectory); + return; + } + + final Iterator itr = currentlyCached.iterator(); + while (itr.hasNext()) { + final ActiveIndexSearcher activeSearcher = itr.next(); + if ( activeSearcher.getSearcher().equals(searcher) ) { + if ( activeSearcher.isCache() ) { + // the searcher is cached. Just leave it open. + logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); + return; + } else { + // searcher is not cached. It was created from a writer, and we want + // the newest updates the next time that we get a searcher, so we will + // go ahead and close this one out. + itr.remove(); + + // decrement the writer count because we incremented it when creating the searcher + final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount != null ) { + if ( writerCount.getCount() <= 1 ) { + try { + logger.debug("Index searcher for {} is not cached. Writer count is " + + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); + + writerCount.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } else { + logger.debug("Index searcher for {} is not cached. Writer count is decremented " + + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); + + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), + writerCount.getCount() - 1)); + } + } + + try { + logger.debug("Closing Index Searcher for {}", indexDirectory); + activeSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + logger.debug("Closing Index Manager"); + + lock.lock(); + try { + IOException ioe = null; + + for ( final IndexWriterCount count : writerCounts.values() ) { + try { + count.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + for (final List searcherList : activeSearchers.values()) { + for (final ActiveIndexSearcher searcher : searcherList) { + try { + searcher.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } finally { + lock.unlock(); + } + } + + + private static void close(final Closeable... closeables) throws IOException { + IOException ioe = null; + for ( final Closeable closeable : closeables ) { + if ( closeable == null ) { + continue; + } + + try { + closeable.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } + + + private static class ActiveIndexSearcher implements Closeable { + private final IndexSearcher searcher; + private final DirectoryReader directoryReader; + private final Directory directory; + private final boolean cache; + + public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, + Directory directory, final boolean cache) { + this.searcher = searcher; + this.directoryReader = directoryReader; + this.directory = directory; + this.cache = cache; + } + + public boolean isCache() { + return cache; + } + + public IndexSearcher getSearcher() { + return searcher; + } + + @Override + public void close() throws IOException { + IndexManager.close(directoryReader, directory); + } + } + + + private static class IndexWriterCount implements Closeable { + private final IndexWriter writer; + private final Analyzer analyzer; + private final Directory directory; + private final int count; + + public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { + this.writer = writer; + this.analyzer = analyzer; + this.directory = directory; + this.count = count; + } + + public Analyzer getAnalyzer() { + return analyzer; + } + + public Directory getDirectory() { + return directory; + } + + public IndexWriter getWriter() { + return writer; + } + + public int getCount() { + return count; + } + + @Override + public void close() throws IOException { + IndexManager.close(writer, analyzer, directory); + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index e2854c3a2c..dcb6e081b1 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -17,31 +17,33 @@ package org.apache.nifi.provenance.lucene; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.Date; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.StandardQueryResult; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.PersistentProvenanceRepository; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.StandardQueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IndexSearch { - + private final Logger logger = LoggerFactory.getLogger(IndexSearch.class); private final PersistentProvenanceRepository repository; private final File indexDirectory; + private final IndexManager indexManager; - public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory) { + public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) { this.repository = repo; this.indexDirectory = indexDirectory; + this.indexManager = indexManager; } public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException { @@ -55,30 +57,57 @@ public class IndexSearch { final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1); final Set matchingRecords; - try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) { - final IndexSearcher searcher = new IndexSearcher(directoryReader); + if (provenanceQuery.getEndDate() == null) { + provenanceQuery.setEndDate(new Date()); + } + final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery); - if (provenanceQuery.getEndDate() == null) { - provenanceQuery.setEndDate(new Date()); - } - final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery); - - TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final long start = System.nanoTime(); + IndexSearcher searcher = null; + try { + searcher = indexManager.borrowIndexSearcher(indexDirectory); + final long searchStartNanos = System.nanoTime(); + final long openSearcherNanos = searchStartNanos - start; + + final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final long finishSearch = System.nanoTime(); + final long searchNanos = finishSearch - searchStartNanos; + + logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, + TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); + if (topDocs.totalHits == 0) { sqr.update(Collections.emptyList(), 0); return sqr; } final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); - matchingRecords = docsReader.read(topDocs, directoryReader, repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); - + matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); + + final long readRecordsNanos = System.nanoTime() - finishSearch; + logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); + sqr.update(matchingRecords, topDocs.totalHits); return sqr; - } catch (final IndexNotFoundException e) { - // nothing has been indexed yet. + } catch (final FileNotFoundException e) { + // nothing has been indexed yet, or the data has already aged off + logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + sqr.update(Collections.emptyList(), 0); return sqr; + } finally { + if ( searcher != null ) { + indexManager.returnIndexSearcher(indexDirectory, searcher); + } } } + + @Override + public String toString() { + return "IndexSearcher[" + indexDirectory + "]"; + } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java index 214267aa4d..5e879133be 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java @@ -24,6 +24,17 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.IntField; +import org.apache.lucene.document.LongField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.provenance.IndexConfiguration; import org.apache.nifi.provenance.PersistentProvenanceRepository; @@ -34,17 +45,6 @@ import org.apache.nifi.provenance.rollover.RolloverAction; import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field.Store; -import org.apache.lucene.document.LongField; -import org.apache.lucene.document.StringField; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,15 +72,93 @@ public class IndexingAction implements RolloverAction { doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store)); } + + public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException { + final Map attributes = record.getAttributes(); + + final Document doc = new Document(); + addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO); + addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO); + addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO); + addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO); + addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO); + addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO); + addField(doc, SearchableFields.Details, record.getDetails(), Store.NO); + addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO); + addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO); + addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO); + addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO); + + if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) { + addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO); + } + + for (final SearchableField searchableField : attributeSearchableFields) { + addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); + } + + final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), "."); + + // Index the fields that we always index (unless there's nothing else to index at all) + if (!doc.getFields().isEmpty()) { + doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO)); + doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO)); + doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO)); + doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES)); + + if ( blockIndex == null ) { + doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES)); + } else { + doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES)); + doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES)); + } + + for (final String lineageIdentifier : record.getLineageIdentifiers()) { + addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); + } + + // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. + if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) { + for (final String uuid : record.getChildUuids()) { + if (!uuid.equals(record.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + } + } + } else if (record.getEventType() == ProvenanceEventType.JOIN) { + for (final String uuid : record.getParentUuids()) { + if (!uuid.equals(record.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + } + } + } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) { + // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID + // that the Source System uses to refer to the data. + final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier(); + final String sourceFlowFileUUID; + final int lastColon = sourceIdentifier.lastIndexOf(":"); + if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) { + sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1); + } else { + sourceFlowFileUUID = null; + } + + if (sourceFlowFileUUID != null) { + addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO); + } + } + + indexWriter.addDocument(doc); + } + } + @Override - @SuppressWarnings("deprecation") public File execute(final File fileRolledOver) throws IOException { final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver); int indexCount = 0; long maxId = -1L; try (final Directory directory = FSDirectory.open(indexingDirectory); - final Analyzer analyzer = new StandardAnalyzer(LuceneUtil.LUCENE_VERSION)) { + final Analyzer analyzer = new StandardAnalyzer()) { final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); config.setWriteLockTimeout(300000L); @@ -89,6 +167,13 @@ public class IndexingAction implements RolloverAction { final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) { StandardProvenanceEventRecord record; while (true) { + final Integer blockIndex; + if ( reader.isBlockIndexAvailable() ) { + blockIndex = reader.getBlockIndex(); + } else { + blockIndex = null; + } + try { record = reader.nextRecord(); } catch (final EOFException eof) { @@ -104,76 +189,8 @@ public class IndexingAction implements RolloverAction { maxId = record.getEventId(); - final Map attributes = record.getAttributes(); - - final Document doc = new Document(); - addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO); - addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO); - addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO); - addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO); - addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO); - addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO); - addField(doc, SearchableFields.Details, record.getDetails(), Store.NO); - addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO); - addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO); - addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO); - addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO); - - if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) { - addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO); - } - - for (final SearchableField searchableField : attributeSearchableFields) { - addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); - } - - final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), "."); - - // Index the fields that we always index (unless there's nothing else to index at all) - if (!doc.getFields().isEmpty()) { - doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO)); - doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO)); - doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO)); - doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES)); - doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES)); - - for (final String lineageIdentifier : record.getLineageIdentifiers()) { - addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); - } - - // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. - if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) { - for (final String uuid : record.getChildUuids()) { - if (!uuid.equals(record.getFlowFileUuid())) { - addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); - } - } - } else if (record.getEventType() == ProvenanceEventType.JOIN) { - for (final String uuid : record.getParentUuids()) { - if (!uuid.equals(record.getFlowFileUuid())) { - addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); - } - } - } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) { - // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID - // that the Source System uses to refer to the data. - final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier(); - final String sourceFlowFileUUID; - final int lastColon = sourceIdentifier.lastIndexOf(":"); - if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) { - sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1); - } else { - sourceFlowFileUUID = null; - } - - if (sourceFlowFileUUID != null) { - addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO); - } - } - - indexWriter.addDocument(doc); - indexCount++; - } + index(record, indexWriter, blockIndex); + indexCount++; } indexWriter.commit(); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java index a7076d5a10..59dc10b752 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java @@ -27,8 +27,8 @@ import java.util.List; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.search.SearchTerm; - import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanClause.Occur; @@ -78,7 +78,16 @@ public class LuceneUtil { final String searchString = baseName + "."; for (final Path path : allProvenanceLogs) { if (path.toFile().getName().startsWith(searchString)) { - matchingFiles.add(path.toFile()); + final File file = path.toFile(); + if ( file.exists() ) { + matchingFiles.add(file); + } else { + final File dir = file.getParentFile(); + final File gzFile = new File(dir, file.getName() + ".gz"); + if ( gzFile.exists() ) { + matchingFiles.add(gzFile); + } + } } } @@ -132,6 +141,19 @@ public class LuceneUtil { return filenameComp; } + final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX); + final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX); + if ( fileOffset1 != null && fileOffset2 != null ) { + final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue()); + if ( blockIndexResult != 0 ) { + return blockIndexResult; + } + + final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + return Long.compare(eventId1, eventId2); + } + final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); return Long.compare(offset1, offset2); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java index 862bc2b809..8bdc88a430 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java @@ -20,12 +20,79 @@ import java.io.Closeable; import java.io.IOException; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.toc.TocReader; public interface RecordReader extends Closeable { + /** + * Returns the next record in the reader, or null if there is no more data available. + * @return + * @throws IOException + */ StandardProvenanceEventRecord nextRecord() throws IOException; + /** + * Skips the specified number of bytes + * @param bytesToSkip + * @throws IOException + */ void skip(long bytesToSkip) throws IOException; + /** + * Skips to the specified byte offset in the underlying stream. + * @param position + * @throws IOException if the underlying stream throws IOException, or if the reader has already + * passed the specified byte offset + */ void skipTo(long position) throws IOException; + + /** + * Skips to the specified compression block + * + * @param blockIndex + * @throws IOException if the underlying stream throws IOException, or if the reader has already + * read passed the specified compression block index + * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it + */ + void skipToBlock(int blockIndex) throws IOException; + + /** + * Returns the block index that the Reader is currently reading from. + * Note that the block index is incremented at the beginning of the {@link #nextRecord()} + * method. This means that this method will return the block from which the previous record was read, + * if calling {@link #nextRecord()} continually, not the block from which the next record will be read. + * @return + */ + int getBlockIndex(); + + /** + * Returns true if the compression block index is available. It will be available + * if and only if the reader is created with a TableOfContents + * + * @return + */ + boolean isBlockIndexAvailable(); + + /** + * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists, + * null otherwise + * @return + */ + TocReader getTocReader(); + + /** + * Returns the number of bytes that have been consumed from the stream (read or skipped). + * @return + */ + long getBytesConsumed(); + + /** + * Returns the ID of the last event in this record reader, or -1 if the reader has no records or + * has already read through all records. Note: This method will consume the stream until the end, + * so no more records will be available on this reader after calling this method. + * + * @return + * @throws IOException + */ + long getMaxEventId() throws IOException; } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index 8f0699529c..dff281c5bc 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.provenance.serialization; -import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -24,82 +23,90 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; import java.util.Collection; -import java.util.zip.GZIPInputStream; -import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.provenance.StandardRecordReader; import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.toc.StandardTocReader; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; public class RecordReaders { public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles) throws IOException { final File originalFile = file; - - if (!file.exists()) { - if (provenanceLogFiles == null) { - throw new FileNotFoundException(file.toString()); - } - - final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + "."; - for (final Path path : provenanceLogFiles) { - if (path.toFile().getName().startsWith(baseName)) { - file = path.toFile(); - break; - } - } - } - InputStream fis = null; - if ( file.exists() ) { - try { - fis = new FileInputStream(file); - } catch (final FileNotFoundException fnfe) { - fis = null; - } - } - - openStream: while ( fis == null ) { - final File dir = file.getParentFile(); - final String baseName = LuceneUtil.substringBefore(file.getName(), "."); - - // depending on which rollover actions have occurred, we could have 3 possibilities for the - // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" - // because most often we are compressing on rollover and most often we have already finished - // compressing by the time that we are querying the data. - for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) { - file = new File(dir, baseName + extension); - if ( file.exists() ) { - try { - fis = new FileInputStream(file); - break openStream; - } catch (final FileNotFoundException fnfe) { - // file was modified by a RolloverAction after we verified that it exists but before we could - // create an InputStream for it. Start over. - fis = null; - continue openStream; - } - } - } - - break; - } - if ( fis == null ) { - throw new FileNotFoundException("Unable to locate file " + originalFile); + try { + if (!file.exists()) { + if (provenanceLogFiles != null) { + final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + "."; + for (final Path path : provenanceLogFiles) { + if (path.toFile().getName().startsWith(baseName)) { + file = path.toFile(); + break; + } + } + } + } + + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + } catch (final FileNotFoundException fnfe) { + fis = null; + } + } + + String filename = file.getName(); + openStream: while ( fis == null ) { + final File dir = file.getParentFile(); + final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + + // depending on which rollover actions have occurred, we could have 3 possibilities for the + // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" + // because most often we are compressing on rollover and most often we have already finished + // compressing by the time that we are querying the data. + for ( final String extension : new String[] {".prov.gz", ".prov"} ) { + file = new File(dir, baseName + extension); + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + filename = baseName + extension; + break openStream; + } catch (final FileNotFoundException fnfe) { + // file was modified by a RolloverAction after we verified that it exists but before we could + // create an InputStream for it. Start over. + fis = null; + continue openStream; + } + } + } + + break; + } + + if ( fis == null ) { + throw new FileNotFoundException("Unable to locate file " + originalFile); + } + + final File tocFile = TocUtil.getTocFile(file); + if ( tocFile.exists() ) { + final TocReader tocReader = new StandardTocReader(tocFile); + return new StandardRecordReader(fis, filename, tocReader); + } else { + return new StandardRecordReader(fis, filename); + } + } catch (final IOException ioe) { + if ( fis != null ) { + try { + fis.close(); + } catch (final IOException inner) { + ioe.addSuppressed(inner); + } + } + + throw ioe; } - final InputStream readableStream; - if (file.getName().endsWith(".gz")) { - readableStream = new BufferedInputStream(new GZIPInputStream(fis)); - } else { - readableStream = new BufferedInputStream(fis); - } - - final DataInputStream dis = new DataInputStream(readableStream); - @SuppressWarnings("unused") - final String repoClassName = dis.readUTF(); - final int serializationVersion = dis.readInt(); - - return new StandardRecordReader(dis, serializationVersion, file.getName()); } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index de98ab9cc5..58f4dc281e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.toc.TocWriter; public interface RecordWriter extends Closeable { @@ -82,4 +83,9 @@ public interface RecordWriter extends Closeable { */ void sync() throws IOException; + /** + * Returns the TOC Writer that is being used to write the Table of Contents for this journal + * @return + */ + TocWriter getTocWriter(); } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java index 15349de18a..47b7c7e5d7 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java @@ -20,11 +20,20 @@ import java.io.File; import java.io.IOException; import org.apache.nifi.provenance.StandardRecordWriter; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; public class RecordWriters { + private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB - public static RecordWriter newRecordWriter(final File file) throws IOException { - return new StandardRecordWriter(file); + public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException { + return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE); + } + + public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException { + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes); } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java new file mode 100644 index 0000000000..8944cec298 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * Standard implementation of TocReader. + * + * Expects .toc file to be in the following format; + * + * byte 0: version + * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocReader implements TocReader { + private final boolean compressed; + private final long[] offsets; + + public StandardTocReader(final File file) throws IOException { + try (final FileInputStream fis = new FileInputStream(file); + final DataInputStream dis = new DataInputStream(fis)) { + + final int version = dis.read(); + if ( version < 0 ) { + throw new EOFException(); + } + + final int compressionFlag = dis.read(); + if ( compressionFlag < 0 ) { + throw new EOFException(); + } + + if ( compressionFlag == 0 ) { + compressed = false; + } else if ( compressionFlag == 1 ) { + compressed = true; + } else { + throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); + } + + final int numBlocks = (int) ((file.length() - 2) / 8); + offsets = new long[numBlocks]; + + for (int i=0; i < numBlocks; i++) { + offsets[i] = dis.readLong(); + } + } + } + + @Override + public boolean isCompressed() { + return compressed; + } + + @Override + public long getBlockOffset(final int blockIndex) { + if ( blockIndex >= offsets.length ) { + return -1L; + } + return offsets[blockIndex]; + } + + @Override + public long getLastBlockOffset() { + if ( offsets.length == 0 ) { + return 0L; + } + return offsets[offsets.length - 1]; + } + + @Override + public void close() throws IOException { + } + + @Override + public int getBlockIndex(final long blockOffset) { + for (int i=0; i < offsets.length; i++) { + if ( offsets[i] > blockOffset ) { + return i-1; + } + } + + return offsets.length - 1; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java new file mode 100644 index 0000000000..488f225242 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Standard implementation of {@link TocWriter}. + * + * Format of .toc file: + * byte 0: version + * byte 1: compressed: 0 -> not compressed, 1 -> compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocWriter implements TocWriter { + private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); + + public static final byte VERSION = 1; + + private final File file; + private final FileOutputStream fos; + private final boolean alwaysSync; + private int index = -1; + + /** + * Creates a StandardTocWriter that writes to the given file. + * @param file the file to write to + * @param compressionFlag whether or not the journal is compressed + * @throws FileNotFoundException + */ + public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException { + final File tocDir = file.getParentFile(); + if ( !tocDir.exists() ) { + Files.createDirectories(tocDir.toPath()); + } + + this.file = file; + fos = new FileOutputStream(file); + this.alwaysSync = alwaysSync; + + final byte[] header = new byte[2]; + header[0] = VERSION; + header[1] = (byte) (compressionFlag ? 1 : 0); + fos.write(header); + fos.flush(); + + if ( alwaysSync ) { + sync(); + } + } + + @Override + public void addBlockOffset(final long offset) throws IOException { + final BufferedOutputStream bos = new BufferedOutputStream(fos); + final DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(offset); + dos.flush(); + index++; + logger.debug("Adding block {} at offset {}", index, offset); + + if ( alwaysSync ) { + sync(); + } + } + + @Override + public void sync() throws IOException { + fos.getFD().sync(); + } + + @Override + public int getCurrentBlockIndex() { + return index; + } + + @Override + public void close() throws IOException { + if (alwaysSync) { + fos.getFD().sync(); + } + + fos.close(); + } + + @Override + public File getFile() { + return file; + } + + @Override + public String toString() { + return "TOC Writer for " + file; + } +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java new file mode 100644 index 0000000000..7c197be9d5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.Closeable; + +/** + *

+ * Reads a Table of Contents (.toc file) for a corresponding Journal File. We use a Table of Contents + * to map a Block Index to an offset into the Journal file where that Block begins. We do this so that + * we can then persist a Block Index for an event and then compress the Journal later. This way, we can + * get good compression by compressing a large batch of events at once, and this way we can also look up + * an event in a Journal that has not been compressed by looking in the Table of Contents or lookup the + * event in a Journal post-compression by simply rewriting the TOC while we compress the data. + *

+ */ +public interface TocReader extends Closeable { + + /** + * Indicates whether or not the corresponding Journal file is compressed + * @return + */ + boolean isCompressed(); + + /** + * Returns the byte offset into the Journal File for the Block with the given index. + * @param blockIndex + * @return + */ + long getBlockOffset(int blockIndex); + + /** + * Returns the byte offset into the Journal File of the last Block in the given index + * @return + */ + long getLastBlockOffset(); + + /** + * Returns the index of the block that contains the given offset + * @param blockOffset + * @return + */ + int getBlockIndex(long blockOffset); +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java new file mode 100644 index 0000000000..c30ac98830 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.File; + +import org.apache.nifi.provenance.lucene.LuceneUtil; + +public class TocUtil { + + /** + * Returns the file that should be used as the Table of Contents for the given Journal File + * @param journalFile + * @return + */ + public static File getTocFile(final File journalFile) { + final File tocDir = new File(journalFile.getParentFile(), "toc"); + final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); + final File tocFile = new File(tocDir, basename + ".toc"); + return tocFile; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java new file mode 100644 index 0000000000..c6780531c5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +/** + * Writes a .toc file + */ +public interface TocWriter extends Closeable { + + /** + * Adds the given block offset as the next Block Offset in the Table of Contents + * @param offset + * @throws IOException + */ + void addBlockOffset(long offset) throws IOException; + + /** + * Returns the index of the current Block + * @return + */ + int getCurrentBlockIndex(); + + /** + * Returns the file that is currently being written to + * @return + */ + File getFile(); + + /** + * Synchronizes the data with the underlying storage device + * @throws IOException + */ + void sync() throws IOException; +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 5be208bd4e..5541ab56ad 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.provenance; +import static org.apache.nifi.provenance.TestUtil.createFlowFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -25,14 +26,14 @@ import java.io.FileFilter; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.SimpleAnalyzer; @@ -45,7 +46,6 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.FSDirectory; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.lineage.EventNode; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageEdge; @@ -59,8 +59,10 @@ import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.file.FileUtils; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -72,87 +74,64 @@ public class TestPersistentProvenanceRepository { public TestName name = new TestName(); private PersistentProvenanceRepository repo; + private RepositoryConfiguration config; public static final int DEFAULT_ROLLOVER_MILLIS = 2000; private RepositoryConfiguration createConfiguration() { - final RepositoryConfiguration config = new RepositoryConfiguration(); + config = new RepositoryConfiguration(); config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString())); - config.setCompressOnRollover(false); + config.setCompressOnRollover(true); config.setMaxEventFileLife(2000L, TimeUnit.SECONDS); + config.setCompressionBlockBytes(100); return config; } + @BeforeClass + public static void setLogLevel() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + } + @Before public void printTestName() { System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************"); } @After - public void closeRepo() { + public void closeRepo() throws IOException { if (repo != null) { try { repo.close(); } catch (final IOException ioe) { } } + + // Delete all of the storage files. We do this in order to clean up the tons of files that + // we create but also to ensure that we have closed all of the file handles. If we leave any + // streams open, for instance, this will throw an IOException, causing our unit test to fail. + for ( final File storageDir : config.getStorageDirectories() ) { + int i; + for (i=0; i < 3; i++) { + try { + FileUtils.deleteFile(storageDir, true); + break; + } catch (final IOException ioe) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if ( i == 2 ) { + throw ioe; + } else { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } + } + } + } } - private FlowFile createFlowFile(final long id, final long fileSize, final Map attributes) { - final Map attrCopy = new HashMap<>(attributes); - - return new FlowFile() { - @Override - public long getId() { - return id; - } - - @Override - public long getEntryDate() { - return System.currentTimeMillis(); - } - - @Override - public Set getLineageIdentifiers() { - return new HashSet(); - } - - @Override - public long getLineageStartDate() { - return System.currentTimeMillis(); - } - - @Override - public Long getLastQueueDate() { - return System.currentTimeMillis(); - } - - @Override - public boolean isPenalized() { - return false; - } - - @Override - public String getAttribute(final String s) { - return attrCopy.get(s); - } - - @Override - public long getSize() { - return fileSize; - } - - @Override - public Map getAttributes() { - return attrCopy; - } - - @Override - public int compareTo(final FlowFile o) { - return 0; - } - }; - } + private EventReporter getEventReporter() { return new EventReporter() { @@ -261,6 +240,8 @@ public class TestPersistentProvenanceRepository { repo.registerEvent(record); } + Thread.sleep(1000L); + repo.close(); Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) @@ -417,10 +398,10 @@ public class TestPersistentProvenanceRepository { @Test public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); - config.setMaxRecordLife(3, TimeUnit.SECONDS); - config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxRecordLife(30, TimeUnit.SECONDS); + config.setMaxStorageCapacity(1024L * 1024L * 10); config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); - config.setMaxEventFileCapacity(1024L * 1024L); + config.setMaxEventFileCapacity(1024L * 1024L * 10); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); @@ -923,12 +904,16 @@ public class TestPersistentProvenanceRepository { final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); secondRepo.initialize(getEventReporter()); - final ProvenanceEventRecord event11 = builder.build(); - secondRepo.registerEvent(event11); - secondRepo.waitForRollover(); - final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); - assertNotNull(event11Retrieved); - assertEquals(10, event11Retrieved.getEventId()); + try { + final ProvenanceEventRecord event11 = builder.build(); + secondRepo.registerEvent(event11); + secondRepo.waitForRollover(); + final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); + assertNotNull(event11Retrieved); + assertEquals(10, event11Retrieved.getEventId()); + } finally { + secondRepo.close(); + } } @Test @@ -998,6 +983,73 @@ public class TestPersistentProvenanceRepository { storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter); assertEquals(0, storageDirFiles.length); } + + + @Test + public void testBackPressure() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileCapacity(1L); // force rollover on each record. + config.setJournalCount(1); + + final AtomicInteger journalCountRef = new AtomicInteger(0); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected int getJournalCount() { + return journalCountRef.get(); + } + }; + repo.initialize(getEventReporter()); + + final Map attributes = new HashMap<>(); + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", UUID.randomUUID().toString()); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + // ensure that we can register the events. + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + repo.registerEvent(builder.build()); + } + + // set number of journals to 6 so that we will block. + journalCountRef.set(6); + + final AtomicLong threadNanos = new AtomicLong(0L); + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + final long start = System.nanoTime(); + builder.fromFlowFile(createFlowFile(13, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13); + repo.registerEvent(builder.build()); + threadNanos.set(System.nanoTime() - start); + } + }); + t.start(); + + Thread.sleep(1500L); + + journalCountRef.set(1); + t.join(); + + final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get()); + assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact + + builder.fromFlowFile(createFlowFile(15, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15); + repo.registerEvent(builder.build()); + } + + + // TODO: test EOF on merge + // TODO: Test journal with no records @Test public void testTextualQuery() throws InterruptedException, IOException, ParseException { diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java new file mode 100644 index 0000000000..6f85b94f15 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.provenance.toc.StandardTocReader; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.util.file.FileUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStandardRecordReaderWriter { + @BeforeClass + public static void setLogLevel() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + } + + private ProvenanceEventRecord createEvent() { + final Map attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final ProvenanceEventRecord record = builder.build(); + + return record; + } + + @Test + public void testSimpleWriteWithToc() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024); + + writer.writeHeader(); + writer.writeRecord(createEvent(), 1L); + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + assertEquals(0, reader.getBlockIndex()); + reader.skipToBlock(0); + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + + + @Test + public void testSingleRecordCompressed() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); + + writer.writeHeader(); + writer.writeRecord(createEvent(), 1L); + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + assertEquals(0, reader.getBlockIndex()); + reader.skipToBlock(0); + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + + + @Test + public void testMultipleRecordsSameBlockCompressed() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + // new record each 1 MB of uncompressed data + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024); + + writer.writeHeader(); + for (int i=0; i < 10; i++) { + writer.writeRecord(createEvent(), i); + } + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + for (int i=0; i < 10; i++) { + assertEquals(0, reader.getBlockIndex()); + + // call skipToBlock half the time to ensure that we can; avoid calling it + // the other half of the time to ensure that it's okay. + if (i <= 5) { + reader.skipToBlock(0); + } + + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + } + + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + + + @Test + public void testMultipleRecordsMultipleBlocksCompressed() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + // new block each 10 bytes + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); + + writer.writeHeader(); + for (int i=0; i < 10; i++) { + writer.writeRecord(createEvent(), i); + } + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + for (int i=0; i < 10; i++) { + StandardProvenanceEventRecord recovered = reader.nextRecord(); + System.out.println(recovered); + assertNotNull(recovered); + assertEquals((long) i, recovered.getEventId()); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + } + + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java new file mode 100644 index 0000000000..7459fe8a8c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; + +public class TestUtil { + public static FlowFile createFlowFile(final long id, final long fileSize, final Map attributes) { + final Map attrCopy = new HashMap<>(attributes); + + return new FlowFile() { + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return System.currentTimeMillis(); + } + + @Override + public Set getLineageIdentifiers() { + return new HashSet(); + } + + @Override + public long getLineageStartDate() { + return System.currentTimeMillis(); + } + + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis(); + } + + @Override + public boolean isPenalized() { + return false; + } + + @Override + public String getAttribute(final String s) { + return attrCopy.get(s); + } + + @Override + public long getSize() { + return fileSize; + } + + @Override + public Map getAttributes() { + return attrCopy; + } + + @Override + public int compareTo(final FlowFile o) { + return 0; + } + }; + } +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java new file mode 100644 index 0000000000..30326e7264 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; + +import org.junit.Test; + +public class TestStandardTocReader { + + @Test + public void testDetectsCompression() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file)) { + out.write(0); + out.write(0); + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + } + } finally { + file.delete(); + } + + + try (final OutputStream out = new FileOutputStream(file)) { + out.write(0); + out.write(1); + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertTrue(reader.isCompressed()); + } + } finally { + file.delete(); + } + } + + + @Test + public void testGetBlockIndex() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file); + final DataOutputStream dos = new DataOutputStream(out)) { + out.write(0); + out.write(0); + + for (int i=0; i < 1024; i++) { + dos.writeLong(i * 1024L); + } + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + + for (int i=0; i < 1024; i++) { + assertEquals(i * 1024, reader.getBlockOffset(i)); + } + } + } finally { + file.delete(); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java new file mode 100644 index 0000000000..70f55a2c6c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import org.apache.nifi.util.file.FileUtils; +import org.junit.Test; + +public class TestStandardTocWriter { + @Test + public void testOverwriteEmptyFile() throws IOException { + final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc"); + try { + assertTrue( tocFile.createNewFile() ); + + try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) { + } + } finally { + FileUtils.deleteFile(tocFile, false); + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index c3c05df34e..be0fc67585 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -176,6 +176,7 @@ apache-rat-plugin + src/test/resources/localhost.cer src/test/resources/hello.txt src/test/resources/CharacterSetConversionSamples/Converted.txt src/test/resources/CharacterSetConversionSamples/Original.txt @@ -231,6 +232,8 @@ src/test/resources/TestTransformXml/tokens.xml src/test/resources/TestUnpackContent/folder/cal.txt src/test/resources/TestUnpackContent/folder/date.txt + src/test/resources/TestUnpackContent/data.flowfilev2 + src/test/resources/TestUnpackContent/data.flowfilev3 src/test/resources/TestXml/xml-bundle-1 src/test/resources/CompressedData/SampleFile.txt.bz2 src/test/resources/CompressedData/SampleFile.txt.gz