mirror of https://github.com/apache/nifi.git
NIFI-793: Added multi-threading to the indexing in the Persistent Provenance Repository
This commit is contained in:
parent
f171756a88
commit
19f7db6986
|
@ -285,6 +285,7 @@ language governing permissions and limitations under the License. -->
|
|||
<nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
|
||||
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
|
||||
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
|
||||
<nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads>
|
||||
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
|
||||
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields>
|
||||
<nifi.provenance.repository.indexed.attributes />
|
||||
|
|
|
@ -99,6 +99,7 @@ public class NiFiProperties extends Properties {
|
|||
public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
|
||||
public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
|
||||
public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
|
||||
public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads";
|
||||
public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
|
||||
public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields";
|
||||
public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes";
|
||||
|
|
|
@ -513,7 +513,11 @@ Providing three total locations, including _nifi.provenance.repository.director
|
|||
|nifi.provenance.repository.max.storage.size|The maximum amount of data provenance information to store at a time. The default is 1 GB.
|
||||
|nifi.provenance.repository.rollover.time|The amount of time to wait before rolling over the latest data provenance information so that it is available in the User Interface. The default value is 5 mins.
|
||||
|nifi.provenance.repository.rollover.size|The amount of information to roll over at a time. The default value is 100 MB.
|
||||
|nifi.provenance.repository.query.threads|The number of threads to use for Provenance Repository queries. The default value is 2.
|
||||
|nifi.provenance.repository.query.threads|The number of threads to use for Provenance Repository queries. The default value is 2.
|
||||
|nifi.provenance.repository.index.threads|The number of threads to use for indexing Provenance events so that they are searchable. The default value is 1.
|
||||
For flows that operate on a very high number of FlowFiles, the indexing of Provenance events could become a bottleneck. If this is the case, a bulletin will appear, indicating that
|
||||
"The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate." If this happens, increasing the value of this property
|
||||
may increase the rate at which the Provenance Repository is able to process these records, resulting in better overall throughput.
|
||||
|nifi.provenance.repository.compress.on.rollover|Indicates whether to compress the provenance information when rolling it over. The default value is _true_.
|
||||
|nifi.provenance.repository.always.sync|If set to _true_, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is _false_, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is _false_.
|
||||
|nifi.provenance.repository.journal.count|The number of journal files that should be used to serialize Provenance Event data. Increasing this value will allow more tasks to simultaneously update the repository but will result in more expensive merging of the journal files later. This value should ideally be equal to the number of threads that are expected to update the repository simultaneously, but 16 tends to work well in must environments. The default value is 16.
|
||||
|
|
|
@ -71,6 +71,7 @@ nifi.provenance.repository.max.storage.size=${nifi.provenance.repository.max.sto
|
|||
nifi.provenance.repository.rollover.time=${nifi.provenance.repository.rollover.time}
|
||||
nifi.provenance.repository.rollover.size=${nifi.provenance.repository.rollover.size}
|
||||
nifi.provenance.repository.query.threads=${nifi.provenance.repository.query.threads}
|
||||
nifi.provenance.repository.index.threads=${nifi.provenance.repository.index.threads}
|
||||
nifi.provenance.repository.compress.on.rollover=${nifi.provenance.repository.compress.on.rollover}
|
||||
nifi.provenance.repository.always.sync=${nifi.provenance.repository.always.sync}
|
||||
nifi.provenance.repository.journal.count=${nifi.provenance.repository.journal.count}
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -95,6 +97,7 @@ import org.apache.nifi.util.NiFiProperties;
|
|||
import org.apache.nifi.util.RingBuffer;
|
||||
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -279,6 +282,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
|
||||
final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
|
||||
final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
|
||||
final int indexThreads = properties.getIntegerProperty(
|
||||
NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1);
|
||||
final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
|
||||
|
||||
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
|
||||
|
@ -327,6 +332,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
|
||||
config.setMaxStorageCapacity(maxStorageBytes);
|
||||
config.setQueryThreadPoolSize(queryThreads);
|
||||
config.setIndexThreadPoolSize(indexThreads);
|
||||
config.setJournalCount(journalCount);
|
||||
config.setMaxAttributeChars(maxAttrChars);
|
||||
|
||||
|
@ -801,7 +807,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
*
|
||||
* @throws IOException if unable to purge old events due to an I/O problem
|
||||
*/
|
||||
void purgeOldEvents() throws IOException {
|
||||
synchronized void purgeOldEvents() throws IOException {
|
||||
while (!recoveryFinished.get()) {
|
||||
try {
|
||||
Thread.sleep(100L);
|
||||
|
@ -1030,6 +1036,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
}
|
||||
|
||||
if (fileRolledOver == null) {
|
||||
logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
|
||||
return;
|
||||
}
|
||||
final File file = fileRolledOver;
|
||||
|
@ -1091,19 +1098,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
// that is no longer the case.
|
||||
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
|
||||
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
|
||||
+ "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and "
|
||||
+ "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
|
||||
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
|
||||
eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
|
||||
+ "exceeding the provenance recording rate. Slowing down flow to accomodate");
|
||||
+ "exceeding the provenance recording rate. Slowing down flow to accommodate");
|
||||
|
||||
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (final InterruptedException ie) {
|
||||
if (repoSize > sizeThreshold) {
|
||||
logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events");
|
||||
purgeOldEvents();
|
||||
|
||||
journalFileCount = getJournalCount();
|
||||
repoSize = getSize(getLogFiles(), 0L);
|
||||
continue;
|
||||
} else {
|
||||
// if we are constrained by the number of journal files rather than the size of the repo,
|
||||
// then we will just sleep a bit because another thread is already actively merging the journals,
|
||||
// due to the runnable that we scheduled above
|
||||
try {
|
||||
Thread.sleep(100L);
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
|
||||
+ "to accomodate. Currently, there are {} journal files ({} bytes) and "
|
||||
+ "to accommodate. Currently, there are {} journal files ({} bytes) and "
|
||||
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
|
||||
|
||||
journalFileCount = getJournalCount();
|
||||
|
@ -1219,6 +1238,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
}
|
||||
|
||||
if (journalFiles.isEmpty()) {
|
||||
logger.debug("Couldn't merge journals: Journal Files is empty; won't merge journals");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1380,11 +1400,51 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
final IndexingAction indexingAction = new IndexingAction(this);
|
||||
|
||||
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
|
||||
long maxId = 0L;
|
||||
|
||||
final BlockingQueue<Tuple<StandardProvenanceEventRecord, Integer>> eventQueue = new LinkedBlockingQueue<>(100);
|
||||
final AtomicBoolean finishedAdding = new AtomicBoolean(false);
|
||||
final List<Future<?>> futures = new ArrayList<>();
|
||||
|
||||
final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
|
||||
try {
|
||||
long maxId = 0L;
|
||||
final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(final Runnable r) {
|
||||
final Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||
t.setName("Index Provenance Events");
|
||||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) {
|
||||
final Callable<Object> callable = new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() throws IOException {
|
||||
while (!eventQueue.isEmpty() || !finishedAdding.get()) {
|
||||
final Tuple<StandardProvenanceEventRecord, Integer> tuple;
|
||||
try {
|
||||
tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
|
||||
} catch (final InterruptedException ie) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tuple == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
final Future<?> future = exec.submit(callable);
|
||||
futures.add(future);
|
||||
}
|
||||
|
||||
while (!recordToReaderMap.isEmpty()) {
|
||||
final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
|
||||
final StandardProvenanceEventRecord record = entry.getKey();
|
||||
|
@ -1393,7 +1453,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
writer.writeRecord(record, record.getEventId());
|
||||
final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
|
||||
|
||||
indexingAction.index(record, indexWriter, blockIndex);
|
||||
boolean accepted = false;
|
||||
while (!accepted) {
|
||||
try {
|
||||
accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
maxId = record.getEventId();
|
||||
|
||||
latestRecords.add(truncateAttributes(record));
|
||||
|
@ -1414,16 +1480,30 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
recordToReaderMap.put(nextRecord, reader);
|
||||
}
|
||||
}
|
||||
indexWriter.commit();
|
||||
} catch (final Throwable t) {
|
||||
indexWriter.rollback();
|
||||
throw t;
|
||||
} finally {
|
||||
finishedAdding.set(true);
|
||||
exec.shutdown();
|
||||
}
|
||||
|
||||
indexConfig.setMaxIdIndexed(maxId);
|
||||
for (final Future<?> future : futures) {
|
||||
try {
|
||||
future.get();
|
||||
} catch (final ExecutionException ee) {
|
||||
final Throwable t = ee.getCause();
|
||||
if (t instanceof RuntimeException) {
|
||||
throw (RuntimeException) t;
|
||||
}
|
||||
|
||||
throw new RuntimeException(t);
|
||||
} catch (final InterruptedException e) {
|
||||
throw new RuntimeException("Thread interrupted");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
indexManager.returnIndexWriter(indexingDirectory, indexWriter);
|
||||
}
|
||||
|
||||
indexConfig.setMaxIdIndexed(maxId);
|
||||
}
|
||||
|
||||
// record should now be available in the repository. We can copy the values from latestRecords to ringBuffer.
|
||||
|
@ -1468,6 +1548,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
if (records == 0) {
|
||||
writerFile.delete();
|
||||
logger.debug("Couldn't merge journals: No Records to merge");
|
||||
return null;
|
||||
} else {
|
||||
final long nanos = System.nanoTime() - startNanos;
|
||||
|
|
|
@ -16,14 +16,14 @@
|
|||
*/
|
||||
package org.apache.nifi.provenance;
|
||||
|
||||
import org.apache.nifi.provenance.search.SearchableField;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.provenance.search.SearchableField;
|
||||
|
||||
public class RepositoryConfiguration {
|
||||
|
||||
private final List<File> storageDirectories = new ArrayList<>();
|
||||
|
@ -40,7 +40,8 @@ public class RepositoryConfiguration {
|
|||
private List<SearchableField> searchableAttributes = new ArrayList<>();
|
||||
private boolean compress = true;
|
||||
private boolean alwaysSync = false;
|
||||
private int queryThreadPoolSize = 1;
|
||||
private int queryThreadPoolSize = 2;
|
||||
private int indexThreadPoolSize = 1;
|
||||
private boolean allowRollover = true;
|
||||
|
||||
public void setAllowRollover(final boolean allow) {
|
||||
|
@ -203,6 +204,20 @@ public class RepositoryConfiguration {
|
|||
this.queryThreadPoolSize = queryThreadPoolSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of threads to use to index provenance events
|
||||
*/
|
||||
public int getIndexThreadPoolSize() {
|
||||
return indexThreadPoolSize;
|
||||
}
|
||||
|
||||
public void setIndexThreadPoolSize(final int indexThreadPoolSize) {
|
||||
if (indexThreadPoolSize < 1) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
this.indexThreadPoolSize = indexThreadPoolSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Specifies the desired size of each Provenance Event index shard, in
|
||||
|
@ -213,22 +228,21 @@ public class RepositoryConfiguration {
|
|||
* <li>
|
||||
* A very large index requires a significant amount of Java heap space to
|
||||
* search. As the size of the shard increases, the required Java heap space
|
||||
* also increases.
|
||||
* </li>
|
||||
* also increases.</li>
|
||||
* <li>
|
||||
* By having multiple shards, we have the ability to use multiple concurrent
|
||||
* threads to search the individual shards, resulting in far less latency
|
||||
* when performing a search across millions or billions of records.
|
||||
* </li>
|
||||
* when performing a search across millions or billions of records.</li>
|
||||
* <li>
|
||||
* We keep track of which time ranges each index shard spans. As a result,
|
||||
* we are able to determine which shards need to be searched if a search
|
||||
* provides a date range. This can greatly increase the speed of a search
|
||||
* and reduce resource utilization.
|
||||
* </li>
|
||||
* and reduce resource utilization.</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param bytes the number of bytes to write to an index before beginning a new shard
|
||||
* @param bytes
|
||||
* the number of bytes to write to an index before beginning a
|
||||
* new shard
|
||||
*/
|
||||
public void setDesiredIndexSize(final long bytes) {
|
||||
this.desiredIndexBytes = bytes;
|
||||
|
|
Loading…
Reference in New Issue