From 19f7db69863f26fae8eb98b08300a31dcf7cb8cc Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 19 Aug 2015 12:18:00 -0400 Subject: [PATCH] NIFI-793: Added multi-threading to the indexing in the Persistent Provenance Repository --- nifi-assembly/pom.xml | 1 + .../org/apache/nifi/util/NiFiProperties.java | 1 + .../main/asciidoc/administration-guide.adoc | 6 +- .../src/main/resources/conf/nifi.properties | 1 + .../PersistentProvenanceRepository.java | 109 +++++++++++++++--- .../provenance/RepositoryConfiguration.java | 34 ++++-- 6 files changed, 127 insertions(+), 25 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 1f1522787d..a712b86638 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -285,6 +285,7 @@ language governing permissions and limitations under the License. --> 30 secs 100 MB 2 + 1 true EventType, FlowFileUUID, Filename, ProcessorID, Relationship diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index e25f5d6129..520e0ba48b 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -99,6 +99,7 @@ public class NiFiProperties extends Properties { public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; + public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads"; public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields"; public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes"; diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 7724713587..4dfddf9270 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -513,7 +513,11 @@ Providing three total locations, including _nifi.provenance.repository.director |nifi.provenance.repository.max.storage.size|The maximum amount of data provenance information to store at a time. The default is 1 GB. |nifi.provenance.repository.rollover.time|The amount of time to wait before rolling over the latest data provenance information so that it is available in the User Interface. The default value is 5 mins. |nifi.provenance.repository.rollover.size|The amount of information to roll over at a time. The default value is 100 MB. -|nifi.provenance.repository.query.threads|The number of threads to use for Provenance Repository queries. The default value is 2. +|nifi.provenance.repository.query.threads|The number of threads to use for Provenance Repository queries. The default value is 2. +|nifi.provenance.repository.index.threads|The number of threads to use for indexing Provenance events so that they are searchable. The default value is 1. + For flows that operate on a very high number of FlowFiles, the indexing of Provenance events could become a bottleneck. If this is the case, a bulletin will appear, indicating that + "The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate." If this happens, increasing the value of this property + may increase the rate at which the Provenance Repository is able to process these records, resulting in better overall throughput. |nifi.provenance.repository.compress.on.rollover|Indicates whether to compress the provenance information when rolling it over. The default value is _true_. |nifi.provenance.repository.always.sync|If set to _true_, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is _false_, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is _false_. |nifi.provenance.repository.journal.count|The number of journal files that should be used to serialize Provenance Event data. Increasing this value will allow more tasks to simultaneously update the repository but will result in more expensive merging of the journal files later. This value should ideally be equal to the number of threads that are expected to update the repository simultaneously, but 16 tends to work well in must environments. The default value is 16. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 4043076774..63e5391f14 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -71,6 +71,7 @@ nifi.provenance.repository.max.storage.size=${nifi.provenance.repository.max.sto nifi.provenance.repository.rollover.time=${nifi.provenance.repository.rollover.time} nifi.provenance.repository.rollover.size=${nifi.provenance.repository.rollover.size} nifi.provenance.repository.query.threads=${nifi.provenance.repository.query.threads} +nifi.provenance.repository.index.threads=${nifi.provenance.repository.index.threads} nifi.provenance.repository.compress.on.rollover=${nifi.provenance.repository.compress.on.rollover} nifi.provenance.repository.always.sync=${nifi.provenance.repository.always.sync} nifi.provenance.repository.journal.count=${nifi.provenance.repository.journal.count} diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index a1063f0f0e..4657686baa 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -95,6 +97,7 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,6 +282,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); + final int indexThreads = properties.getIntegerProperty( + NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1); final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); @@ -327,6 +332,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS); config.setMaxStorageCapacity(maxStorageBytes); config.setQueryThreadPoolSize(queryThreads); + config.setIndexThreadPoolSize(indexThreads); config.setJournalCount(journalCount); config.setMaxAttributeChars(maxAttrChars); @@ -801,7 +807,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository * * @throws IOException if unable to purge old events due to an I/O problem */ - void purgeOldEvents() throws IOException { + synchronized void purgeOldEvents() throws IOException { while (!recoveryFinished.get()) { try { Thread.sleep(100L); @@ -1030,6 +1036,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } if (fileRolledOver == null) { + logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); return; } final File file = fileRolledOver; @@ -1091,19 +1098,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // that is no longer the case. if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " - + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and " + + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and " + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate. Slowing down flow to accomodate"); + + "exceeding the provenance recording rate. Slowing down flow to accommodate"); while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { + if (repoSize > sizeThreshold) { + logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events"); + purgeOldEvents(); + + journalFileCount = getJournalCount(); + repoSize = getSize(getLogFiles(), 0L); + continue; + } else { + // if we are constrained by the number of journal files rather than the size of the repo, + // then we will just sleep a bit because another thread is already actively merging the journals, + // due to the runnable that we scheduled above + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + } } logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accomodate. Currently, there are {} journal files ({} bytes) and " + + "to accommodate. Currently, there are {} journal files ({} bytes) and " + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); journalFileCount = getJournalCount(); @@ -1219,6 +1238,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } if (journalFiles.isEmpty()) { + logger.debug("Couldn't merge journals: Journal Files is empty; won't merge journals"); return null; } @@ -1380,11 +1400,51 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final IndexingAction indexingAction = new IndexingAction(this); final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp); + long maxId = 0L; + + final BlockingQueue> eventQueue = new LinkedBlockingQueue<>(100); + final AtomicBoolean finishedAdding = new AtomicBoolean(false); + final List> futures = new ArrayList<>(); + final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); try { - long maxId = 0L; + final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("Index Provenance Events"); + return t; + } + }); try { + for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) { + final Callable callable = new Callable() { + @Override + public Object call() throws IOException { + while (!eventQueue.isEmpty() || !finishedAdding.get()) { + final Tuple tuple; + try { + tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + continue; + } + + if (tuple == null) { + continue; + } + + indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue()); + } + + return null; + } + }; + + final Future future = exec.submit(callable); + futures.add(future); + } + while (!recordToReaderMap.isEmpty()) { final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); final StandardProvenanceEventRecord record = entry.getKey(); @@ -1393,7 +1453,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository writer.writeRecord(record, record.getEventId()); final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); - indexingAction.index(record, indexWriter, blockIndex); + boolean accepted = false; + while (!accepted) { + try { + accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + } + } maxId = record.getEventId(); latestRecords.add(truncateAttributes(record)); @@ -1414,16 +1480,30 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository recordToReaderMap.put(nextRecord, reader); } } - indexWriter.commit(); - } catch (final Throwable t) { - indexWriter.rollback(); - throw t; + } finally { + finishedAdding.set(true); + exec.shutdown(); } - indexConfig.setMaxIdIndexed(maxId); + for (final Future future : futures) { + try { + future.get(); + } catch (final ExecutionException ee) { + final Throwable t = ee.getCause(); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + + throw new RuntimeException(t); + } catch (final InterruptedException e) { + throw new RuntimeException("Thread interrupted"); + } + } } finally { indexManager.returnIndexWriter(indexingDirectory, indexWriter); } + + indexConfig.setMaxIdIndexed(maxId); } // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer. @@ -1468,6 +1548,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (records == 0) { writerFile.delete(); + logger.debug("Couldn't merge journals: No Records to merge"); return null; } else { final long nanos = System.nanoTime() - startNanos; diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index 381d778595..e63133afd3 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -16,14 +16,14 @@ */ package org.apache.nifi.provenance; -import org.apache.nifi.provenance.search.SearchableField; - import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.nifi.provenance.search.SearchableField; + public class RepositoryConfiguration { private final List storageDirectories = new ArrayList<>(); @@ -40,7 +40,8 @@ public class RepositoryConfiguration { private List searchableAttributes = new ArrayList<>(); private boolean compress = true; private boolean alwaysSync = false; - private int queryThreadPoolSize = 1; + private int queryThreadPoolSize = 2; + private int indexThreadPoolSize = 1; private boolean allowRollover = true; public void setAllowRollover(final boolean allow) { @@ -203,6 +204,20 @@ public class RepositoryConfiguration { this.queryThreadPoolSize = queryThreadPoolSize; } + /** + * @return the number of threads to use to index provenance events + */ + public int getIndexThreadPoolSize() { + return indexThreadPoolSize; + } + + public void setIndexThreadPoolSize(final int indexThreadPoolSize) { + if (indexThreadPoolSize < 1) { + throw new IllegalArgumentException(); + } + this.indexThreadPoolSize = indexThreadPoolSize; + } + /** *

* Specifies the desired size of each Provenance Event index shard, in @@ -213,22 +228,21 @@ public class RepositoryConfiguration { *

  • * A very large index requires a significant amount of Java heap space to * search. As the size of the shard increases, the required Java heap space - * also increases. - *
  • + * also increases. *
  • * By having multiple shards, we have the ability to use multiple concurrent * threads to search the individual shards, resulting in far less latency - * when performing a search across millions or billions of records. - *
  • + * when performing a search across millions or billions of records. *
  • * We keep track of which time ranges each index shard spans. As a result, * we are able to determine which shards need to be searched if a search * provides a date range. This can greatly increase the speed of a search - * and reduce resource utilization. - *
  • + * and reduce resource utilization. * * - * @param bytes the number of bytes to write to an index before beginning a new shard + * @param bytes + * the number of bytes to write to an index before beginning a + * new shard */ public void setDesiredIndexSize(final long bytes) { this.desiredIndexBytes = bytes;