diff --git a/nifi-commons/nifi-rocksdb-utils/pom.xml b/nifi-commons/nifi-rocksdb-utils/pom.xml
new file mode 100644
index 0000000000..02ece68879
--- /dev/null
+++ b/nifi-commons/nifi-rocksdb-utils/pom.xml
@@ -0,0 +1,44 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-commons
+ 1.10.0-SNAPSHOT
+
+ nifi-rocksdb-utils
+ 1.10.0-SNAPSHOT
+ jar
+
+
+ org.apache.nifi
+ nifi-api
+ 1.10.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-properties
+ 1.10.0-SNAPSHOT
+
+
+ org.rocksdb
+ rocksdbjni
+ 6.0.1
+
+
+
diff --git a/nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java b/nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java
new file mode 100644
index 0000000000..81d4adb0ab
--- /dev/null
+++ b/nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java
@@ -0,0 +1,891 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rocksdb;
+
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.util.StringUtils;
+import org.rocksdb.AccessHint;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * This RocksDB helper class, instead of forcing to disk every time it's given a record,
+ * persists all waiting records on a regular interval (using a ScheduledExecutorService).
+ * Like when a metronome ticks.
+ */
+
+public class RocksDBMetronome implements Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBMetronome.class);
+
+ static final String CONFIGURATION_FAMILY = "configuration.column.family";
+ static final String DEFAULT_FAMILY = "default";
+
+ private final AtomicLong lastSyncWarningNanos = new AtomicLong(0L);
+ private final int parallelThreads;
+ private final int maxWriteBufferNumber;
+ private final int minWriteBufferNumberToMerge;
+ private final long writeBufferSize;
+ private final long maxTotalWalSize;
+ private final long delayedWriteRate;
+ private final int level0SlowdownWritesTrigger;
+ private final int level0StopWritesTrigger;
+ private final int maxBackgroundFlushes;
+ private final int maxBackgroundCompactions;
+ private final int statDumpSeconds;
+ private final long syncMillis;
+ private final long syncWarningNanos;
+ private final Path storagePath;
+ private final boolean adviseRandomOnOpen;
+ private final boolean createIfMissing;
+ private final boolean createMissingColumnFamilies;
+ private final boolean useFsync;
+
+ private final Set columnFamilyNames;
+ private final Map columnFamilyHandles;
+
+ private final boolean periodicSyncEnabled;
+ private final ScheduledExecutorService syncExecutor;
+ private final ReentrantLock syncLock = new ReentrantLock();
+ private final Condition syncCondition = syncLock.newCondition();
+ private final AtomicInteger syncCounter = new AtomicInteger(0);
+
+ private volatile RocksDB rocksDB = null;
+ private final ReentrantReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock dbReadLock = dbReadWriteLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock dbWriteLock = dbReadWriteLock.writeLock();
+ private volatile boolean closed = false;
+
+ private ColumnFamilyHandle configurationColumnFamilyHandle;
+ private ColumnFamilyHandle defaultColumnFamilyHandle;
+ private WriteOptions forceSyncWriteOptions;
+ private WriteOptions noSyncWriteOptions;
+
+ private RocksDBMetronome(Builder builder) {
+ statDumpSeconds = builder.statDumpSeconds;
+ parallelThreads = builder.parallelThreads;
+ maxWriteBufferNumber = builder.maxWriteBufferNumber;
+ minWriteBufferNumberToMerge = builder.minWriteBufferNumberToMerge;
+ writeBufferSize = builder.writeBufferSize;
+ maxTotalWalSize = builder.getMaxTotalWalSize();
+ delayedWriteRate = builder.delayedWriteRate;
+ level0SlowdownWritesTrigger = builder.level0SlowdownWritesTrigger;
+ level0StopWritesTrigger = builder.level0StopWritesTrigger;
+ maxBackgroundFlushes = builder.maxBackgroundFlushes;
+ maxBackgroundCompactions = builder.maxBackgroundCompactions;
+ syncMillis = builder.syncMillis;
+ syncWarningNanos = builder.syncWarningNanos;
+ storagePath = builder.storagePath;
+ adviseRandomOnOpen = builder.adviseRandomOnOpen;
+ createIfMissing = builder.createIfMissing;
+ createMissingColumnFamilies = builder.createMissingColumnFamilies;
+ useFsync = builder.useFsync;
+ columnFamilyNames = builder.columnFamilyNames;
+ columnFamilyHandles = new HashMap<>(columnFamilyNames.size());
+
+ periodicSyncEnabled = builder.periodicSyncEnabled;
+ syncExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ /**
+ * Initialize the metronome
+ *
+ * @throws IOException if there is an issue with the underlying database
+ */
+ public void initialize() throws IOException {
+
+ final String rocksSharedLibDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
+ final String javaTmpDir = System.getProperty("java.io.tmpdir");
+ final String libDir = !StringUtils.isBlank(rocksSharedLibDir) ? rocksSharedLibDir : javaTmpDir;
+ try {
+ Files.createDirectories(Paths.get(libDir));
+ } catch (IOException e) {
+ throw new IOException("Unable to load the RocksDB shared library into directory: " + libDir, e);
+ }
+
+ // delete any previous librocksdbjni.so files
+ final File[] rocksSos = Paths.get(libDir).toFile().listFiles((dir, name) -> name.startsWith("librocksdbjni") && name.endsWith(".so"));
+ if (rocksSos != null) {
+ for (File rocksSo : rocksSos) {
+ if (!rocksSo.delete()) {
+ logger.warn("Could not delete existing librocksdbjni*.so file {}", rocksSo);
+ }
+ }
+ }
+
+ try {
+ RocksDB.loadLibrary();
+ } catch (Throwable t) {
+ if (System.getProperty("os.name").startsWith("Windows")) {
+ logger.error("The RocksDBMetronome will only work on Windows if you have Visual C++ runtime libraries for Visual Studio 2015 installed. " +
+ "If the DLLs required to support RocksDB cannot be found, then NiFi will not start!");
+ }
+ throw t;
+ }
+
+ Files.createDirectories(storagePath);
+
+ forceSyncWriteOptions = new WriteOptions()
+ .setDisableWAL(false)
+ .setSync(true);
+
+ noSyncWriteOptions = new WriteOptions()
+ .setDisableWAL(false)
+ .setSync(false);
+
+
+ dbWriteLock.lock();
+ try (final DBOptions dbOptions = new DBOptions()
+ .setAccessHintOnCompactionStart(AccessHint.SEQUENTIAL)
+ .setAdviseRandomOnOpen(adviseRandomOnOpen)
+ .setAllowMmapWrites(false) // required to be false for RocksDB.syncWal() to work
+ .setCreateIfMissing(createIfMissing)
+ .setCreateMissingColumnFamilies(createMissingColumnFamilies)
+ .setDelayedWriteRate(delayedWriteRate)
+ .setIncreaseParallelism(parallelThreads)
+ .setLogger(getRocksLogger())
+ .setMaxBackgroundCompactions(maxBackgroundCompactions)
+ .setMaxBackgroundFlushes(maxBackgroundFlushes)
+ .setMaxTotalWalSize(maxTotalWalSize)
+ .setStatsDumpPeriodSec(statDumpSeconds)
+ .setUseFsync(useFsync)
+ ;
+
+ final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()
+ .setCompressionType(CompressionType.LZ4_COMPRESSION)
+ .setLevel0SlowdownWritesTrigger(level0SlowdownWritesTrigger)
+ .setLevel0StopWritesTrigger(level0StopWritesTrigger)
+ .setMaxWriteBufferNumber(maxWriteBufferNumber)
+ .setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge)
+ .setWriteBufferSize(writeBufferSize)
+ ) {
+
+ // create family descriptors
+ List familyDescriptors = new ArrayList<>(columnFamilyNames.size());
+ for (byte[] name : columnFamilyNames) {
+ familyDescriptors.add(new ColumnFamilyDescriptor(name, cfOptions));
+ }
+
+ List columnFamilyList = new ArrayList<>(columnFamilyNames.size());
+
+ rocksDB = RocksDB.open(dbOptions, storagePath.toString(), familyDescriptors, columnFamilyList);
+
+ // create the map of names to handles
+ columnFamilyHandles.put(DEFAULT_FAMILY, rocksDB.getDefaultColumnFamily());
+ for (ColumnFamilyHandle cf : columnFamilyList) {
+ columnFamilyHandles.put(new String(cf.getName(), StandardCharsets.UTF_8), cf);
+ }
+
+ // set specific special handles
+ defaultColumnFamilyHandle = rocksDB.getDefaultColumnFamily();
+ configurationColumnFamilyHandle = columnFamilyHandles.get(CONFIGURATION_FAMILY);
+
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ } finally {
+ dbWriteLock.unlock();
+ }
+
+ if (periodicSyncEnabled) {
+ syncExecutor.scheduleWithFixedDelay(this::doSync, syncMillis, syncMillis, TimeUnit.MILLISECONDS);
+ }
+
+ logger.info("Initialized RocksDB Repository at {}", storagePath);
+ }
+
+
+ /**
+ * This method checks the state of the database to ensure it is available for use.
+ *
+ * NOTE: This *must* be called holding the dbReadLock
+ *
+ * @throws IllegalStateException if the database is closed or not yet initialized
+ */
+ private void checkDbState() throws IllegalStateException {
+ if (rocksDB == null) {
+ if (closed) {
+ throw new IllegalStateException("RocksDBMetronome is closed");
+ }
+ throw new IllegalStateException("RocksDBMetronome has not been initialized");
+ }
+
+ }
+
+ /**
+ * Return an iterator over the specified column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
+ *
+ * Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
+ *
+ * @param columnFamilyHandle specifies the column family for the iterator
+ * @return an iterator over the specified column family
+ */
+ public RocksIterator getIterator(final ColumnFamilyHandle columnFamilyHandle) {
+ dbReadLock.lock();
+ try {
+ checkDbState();
+ return rocksDB.newIterator(columnFamilyHandle);
+ } finally {
+ dbReadLock.unlock();
+ }
+ }
+
+ /**
+ * Get the value for the provided key in the specified column family
+ *
+ * @param columnFamilyHandle the column family from which to get the value
+ * @param key the key of the value to retrieve
+ * @return the value for the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public byte[] get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException {
+ dbReadLock.lock();
+ try {
+ checkDbState();
+ return rocksDB.get(columnFamilyHandle, key);
+ } finally {
+ dbReadLock.unlock();
+ }
+ }
+
+ /**
+ * Put the key / value pair into the database in the specified column family
+ *
+ * @param columnFamilyHandle the column family in to which to put the value
+ * @param writeOptions specification of options for write operations
+ * @param key the key to be inserted
+ * @param value the value to be associated with the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void put(final ColumnFamilyHandle columnFamilyHandle, WriteOptions writeOptions, final byte[] key, final byte[] value) throws RocksDBException {
+ dbReadLock.lock();
+ try {
+ checkDbState();
+ rocksDB.put(columnFamilyHandle, writeOptions, key, value);
+ } finally {
+ dbReadLock.unlock();
+ }
+ }
+
+ /**
+ * Delete the key / value pair from the specified column family
+ *
+ * @param columnFamilyHandle the column family in to which to put the value
+ * @param writeOptions specification of options for write operations
+ * @param key the key to be inserted
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final WriteOptions writeOptions) throws RocksDBException {
+ dbReadLock.lock();
+ try {
+ checkDbState();
+ rocksDB.delete(columnFamilyHandle, writeOptions, key);
+ } finally {
+ dbReadLock.unlock();
+ }
+ }
+
+ /**
+ * Flushes the WAL and syncs to disk
+ *
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void forceSync() throws RocksDBException {
+ dbReadLock.lock();
+ try {
+ checkDbState();
+ rocksDB.syncWal();
+ } finally {
+ dbReadLock.unlock();
+ }
+ }
+
+ /**
+ * Get the handle for the specified column family
+ *
+ * @param familyName the name of the column family
+ * @return the handle
+ */
+ public ColumnFamilyHandle getColumnFamilyHandle(String familyName) {
+ return columnFamilyHandles.get(familyName);
+ }
+
+ /**
+ * Put the key / value pair into the configuration column family and sync the wal
+ *
+ * @param key the key to be inserted
+ * @param value the value to be associated with the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void putConfiguration(final byte[] key, final byte[] value) throws RocksDBException {
+ put(configurationColumnFamilyHandle, forceSyncWriteOptions, key, value);
+ }
+
+ /**
+ * Put the key / value pair into the database in the specified column family without syncing the wal
+ *
+ * @param columnFamilyHandle the column family in to which to put the value
+ * @param key the key to be inserted
+ * @param value the value to be associated with the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException {
+ put(columnFamilyHandle, noSyncWriteOptions, key, value);
+ }
+
+ /**
+ * Put the key / value pair into the database in the specified column family, optionally syncing the wal
+ *
+ * @param columnFamilyHandle the column family in to which to put the value
+ * @param key the key to be inserted
+ * @param value the value to be associated with the specified key
+ * @param forceSync if true, sync the wal
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
+ put(columnFamilyHandle, getWriteOptions(forceSync), key, value);
+ }
+
+ /**
+ * Put the key / value pair into the database in the default column family, optionally syncing the wal
+ *
+ * @param key the key to be inserted
+ * @param value the value to be associated with the specified key
+ * @param forceSync if true, sync the wal
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void put(final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
+ put(defaultColumnFamilyHandle, getWriteOptions(forceSync), key, value);
+ }
+
+ /**
+ * Put the key / value pair into the database in the default column family, without syncing the wal
+ *
+ * @param key the key to be inserted
+ * @param value the value to be associated with the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void put(final byte[] key, final byte[] value) throws RocksDBException {
+ put(defaultColumnFamilyHandle, noSyncWriteOptions, key, value);
+ }
+
+ /**
+ * Get the value for the provided key in the default column family
+ *
+ * @param key the key of the value to retrieve
+ * @return the value for the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public byte[] get(final byte[] key) throws RocksDBException {
+ return get(defaultColumnFamilyHandle, key);
+ }
+
+ /**
+ * Get the value for the provided key in the configuration column family
+ *
+ * @param key the key of the value to retrieve
+ * @return the value for the specified key
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public byte[] getConfiguration(final byte[] key) throws RocksDBException {
+ return get(configurationColumnFamilyHandle, key);
+ }
+
+
+ /**
+ * Delete the key / value pair from the default column family without syncing the wal
+ *
+ * @param key the key to be inserted
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void delete(byte[] key) throws RocksDBException {
+ delete(defaultColumnFamilyHandle, key, noSyncWriteOptions);
+ }
+
+ /**
+ * Delete the key / value pair from the default column family, optionally syncing the wal
+ *
+ * @param key the key to be inserted
+ * @param forceSync if true, sync the wal
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void delete(byte[] key, final boolean forceSync) throws RocksDBException {
+ delete(defaultColumnFamilyHandle, key, getWriteOptions(forceSync));
+ }
+
+ /**
+ * Delete the key / value pair from the default column family without syncing the wal
+ *
+ * @param columnFamilyHandle the column family in to which to put the value
+ * @param key the key to be inserted
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void delete(final ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+ delete(columnFamilyHandle, key, noSyncWriteOptions);
+ }
+
+ /**
+ * Delete the key / value pair from the specified column family, optionally syncing the wal
+ *
+ * @param columnFamilyHandle the column family in to which to put the value
+ * @param key the key to be inserted
+ * @param forceSync if true, sync the wal
+ * @throws RocksDBException thrown if there is an error in the underlying library.
+ */
+ public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean forceSync) throws RocksDBException {
+ delete(columnFamilyHandle, key, getWriteOptions(forceSync));
+ }
+
+ private WriteOptions getWriteOptions(boolean forceSync) {
+ return forceSync ? forceSyncWriteOptions : noSyncWriteOptions;
+ }
+
+ /**
+ * Return an iterator over the default column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
+ *
+ * Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
+ *
+ * @return an iterator over the default column family
+ */
+ public RocksIterator getIterator() {
+ return getIterator(defaultColumnFamilyHandle);
+ }
+
+ /**
+ * Get the current value of the sync counter. This can be used with waitForSync() to verify that operations have been synced to disk
+ *
+ * @return the current value of the sync counter
+ */
+ public int getSyncCounterValue() {
+ return syncCounter.get();
+ }
+
+ /**
+ * Close the Metronome and the RocksDB Objects
+ *
+ * @throws IOException if there are issues in closing any of the underlying RocksDB objects
+ */
+ @Override
+ public void close() throws IOException {
+
+ logger.info("Closing RocksDBMetronome");
+
+ dbWriteLock.lock();
+ try {
+ logger.info("Shutting down RocksDBMetronome sync executor");
+ syncExecutor.shutdownNow();
+
+ try {
+ logger.info("Pausing RocksDB background work");
+ rocksDB.pauseBackgroundWork();
+ } catch (RocksDBException e) {
+ logger.warn("Unable to pause background work before close.", e);
+ }
+
+ final AtomicReference exceptionReference = new AtomicReference<>();
+
+ logger.info("Closing RocksDB configurations");
+
+ safeClose(forceSyncWriteOptions, exceptionReference);
+ safeClose(noSyncWriteOptions, exceptionReference);
+
+ // close the column family handles first, then the db last
+ for (ColumnFamilyHandle cfh : columnFamilyHandles.values()) {
+ safeClose(cfh, exceptionReference);
+ }
+
+ logger.info("Closing RocksDB database");
+ safeClose(rocksDB, exceptionReference);
+ rocksDB = null;
+ closed = true;
+
+ if (exceptionReference.get() != null) {
+ throw new IOException(exceptionReference.get());
+ }
+ } finally {
+ dbWriteLock.unlock();
+ }
+ }
+
+ /**
+ * @param autoCloseable An {@link AutoCloseable} to be closed
+ * @param exceptionReference A reference to contain any encountered {@link Exception}
+ */
+ private void safeClose(final AutoCloseable autoCloseable, final AtomicReference exceptionReference) {
+ if (autoCloseable != null) {
+ try {
+ autoCloseable.close();
+ } catch (Exception e) {
+ exceptionReference.set(e);
+ }
+ }
+ }
+
+ /**
+ * @return The capacity of the store
+ * @throws IOException if encountered
+ */
+ public long getStorageCapacity() throws IOException {
+ return Files.getFileStore(storagePath).getTotalSpace();
+ }
+
+ /**
+ * @return The usable space of the store
+ * @throws IOException if encountered
+ */
+ public long getUsableStorageSpace() throws IOException {
+ return Files.getFileStore(storagePath).getUsableSpace();
+ }
+
+ /**
+ * This method is scheduled by the syncExecutor. It runs at the specified interval, forcing the RocksDB WAL to disk.
+ *
+ * If the sync is successful, it notifies threads that had been waiting for their records to be persisted using
+ * syncCondition and the syncCounter, which is incremented to indicate success
+ */
+ void doSync() {
+ syncLock.lock();
+ try {
+ // if we're interrupted, return
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ forceSync();
+ syncCounter.incrementAndGet(); // its ok if it rolls over... we're just going to check that the value changed
+ syncCondition.signalAll();
+ } catch (final IllegalArgumentException e) {
+ logger.error("Unable to sync, likely because the repository is out of space.", e);
+ } catch (final Throwable t) {
+ logger.error("Unable to sync", t);
+ } finally {
+ syncLock.unlock();
+ }
+ }
+
+ /**
+ * This method blocks until the next time the WAL is forced to disk, ensuring that all records written before this point have been persisted.
+ */
+ public void waitForSync() throws InterruptedException {
+ final int counterValue = syncCounter.get();
+ waitForSync(counterValue);
+ }
+
+ /**
+ * This method blocks until the WAL has been forced to disk, ensuring that all records written before the point specified by the counterValue have been persisted.
+ *
+ * @param counterValue The value of the counter at the time of a write we must persist
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public void waitForSync(final int counterValue) throws InterruptedException {
+ if (counterValue != syncCounter.get()) {
+ return; // if the counter has already changed, we don't need to wait (or grab the lock) because the records we're concerned with have already been persisted
+ }
+ long waitTimeRemaining = syncWarningNanos;
+ syncLock.lock();
+ try {
+ while (counterValue == syncCounter.get()) { // wait until the counter changes (indicating sync occurred)
+ waitTimeRemaining = syncCondition.awaitNanos(waitTimeRemaining);
+ if (waitTimeRemaining <= 0L) { // this means the wait timed out
+
+ // only log a warning every syncWarningNanos... don't spam the logs
+ final long now = System.nanoTime();
+ final long lastWarning = lastSyncWarningNanos.get();
+ if (now - lastWarning > syncWarningNanos
+ && lastSyncWarningNanos.compareAndSet(lastWarning, now)) {
+ logger.warn("Failed to sync within {} seconds... system configuration may need to be adjusted", TimeUnit.NANOSECONDS.toSeconds(syncWarningNanos));
+ }
+
+ // reset waiting time
+ waitTimeRemaining = syncWarningNanos;
+ }
+ }
+ } finally {
+ syncLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a representation of a long as a byte array
+ *
+ * @param value a long to convert
+ * @return a byte[] representation
+ */
+ public static byte[] getBytes(long value) {
+ byte[] bytes = new byte[8];
+ writeLong(value, bytes);
+ return bytes;
+ }
+
+ /**
+ * Writes a representation of a long to the specified byte array
+ *
+ * @param l a long to convert
+ * @param bytes an array to store the byte representation
+ */
+ public static void writeLong(long l, byte[] bytes) {
+ bytes[0] = (byte) (l >>> 56);
+ bytes[1] = (byte) (l >>> 48);
+ bytes[2] = (byte) (l >>> 40);
+ bytes[3] = (byte) (l >>> 32);
+ bytes[4] = (byte) (l >>> 24);
+ bytes[5] = (byte) (l >>> 16);
+ bytes[6] = (byte) (l >>> 8);
+ bytes[7] = (byte) (l);
+ }
+
+ /**
+ * Creates a long from it's byte array representation
+ * @param bytes to convert to a long
+ * @return a long
+ * @throws IOException if the given byte array is of the wrong size
+ */
+ public static long readLong(final byte[] bytes) throws IOException {
+ if (bytes.length != 8) {
+ throw new IOException("wrong number of bytes to convert to long (must be 8)");
+ }
+ return (((long) (bytes[0]) << 56) +
+ ((long) (bytes[1] & 255) << 48) +
+ ((long) (bytes[2] & 255) << 40) +
+ ((long) (bytes[3] & 255) << 32) +
+ ((long) (bytes[4] & 255) << 24) +
+ ((long) (bytes[5] & 255) << 16) +
+ ((long) (bytes[6] & 255) << 8) +
+ ((long) (bytes[7] & 255)));
+ }
+
+ /**
+ * @return the storage path of the db
+ */
+ public Path getStoragePath() {
+ return storagePath;
+ }
+
+ /**
+ * @return A RocksDB logger capturing all logging output from RocksDB
+ */
+ private org.rocksdb.Logger getRocksLogger() {
+ try (Options options = new Options()
+ // make RocksDB give us everything, and we'll decide what we want to log in our wrapper
+ .setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) {
+ return new LogWrapper(options);
+ }
+ }
+
+ /**
+ * An Extension of org.rocksdb.Logger that wraps the slf4j Logger
+ */
+ private class LogWrapper extends org.rocksdb.Logger {
+
+ LogWrapper(Options options) {
+ super(options);
+ }
+
+ @Override
+ protected void log(final InfoLogLevel infoLogLevel, final String logMsg) {
+ switch (infoLogLevel) {
+ case ERROR_LEVEL:
+ case FATAL_LEVEL:
+ logger.error(logMsg);
+ break;
+ case WARN_LEVEL:
+ logger.warn(logMsg);
+ break;
+ case DEBUG_LEVEL:
+ logger.debug(logMsg);
+ break;
+ case INFO_LEVEL:
+ case HEADER_LEVEL:
+ default:
+ logger.info(logMsg);
+ break;
+ }
+ }
+ }
+
+ public static class Builder {
+
+ int parallelThreads = 8;
+ int maxWriteBufferNumber = 4;
+ int minWriteBufferNumberToMerge = 1;
+ long writeBufferSize = (long) DataUnit.MB.toB(256);
+ long delayedWriteRate = (long) DataUnit.MB.toB(16);
+ int level0SlowdownWritesTrigger = 20;
+ int level0StopWritesTrigger = 40;
+ int maxBackgroundFlushes = 1;
+ int maxBackgroundCompactions = 1;
+ int statDumpSeconds = 600;
+ long syncMillis = 10;
+ long syncWarningNanos = TimeUnit.SECONDS.toNanos(30);
+ Path storagePath;
+ boolean adviseRandomOnOpen = false;
+ boolean createIfMissing = true;
+ boolean createMissingColumnFamilies = true;
+ boolean useFsync = true;
+ boolean periodicSyncEnabled = true;
+ final Set columnFamilyNames = new HashSet<>();
+
+ public RocksDBMetronome build() {
+ if (storagePath == null) {
+ throw new IllegalStateException("Cannot create RocksDBMetronome because storagePath is not set");
+ }
+
+ // add default column families
+ columnFamilyNames.add(RocksDB.DEFAULT_COLUMN_FAMILY);
+ columnFamilyNames.add(CONFIGURATION_FAMILY.getBytes(StandardCharsets.UTF_8));
+
+ return new RocksDBMetronome(this);
+ }
+
+ public Builder addColumnFamily(String name) {
+ this.columnFamilyNames.add(name.getBytes(StandardCharsets.UTF_8));
+ return this;
+ }
+
+ public Builder setStoragePath(Path storagePath) {
+ this.storagePath = storagePath;
+ return this;
+ }
+
+ public Builder setParallelThreads(int parallelThreads) {
+ this.parallelThreads = parallelThreads;
+ return this;
+ }
+
+ public Builder setMaxWriteBufferNumber(int maxWriteBufferNumber) {
+ this.maxWriteBufferNumber = maxWriteBufferNumber;
+ return this;
+ }
+
+ public Builder setMinWriteBufferNumberToMerge(int minWriteBufferNumberToMerge) {
+ this.minWriteBufferNumberToMerge = minWriteBufferNumberToMerge;
+ return this;
+ }
+
+ public Builder setWriteBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ return this;
+ }
+
+ public Builder setDelayedWriteRate(long delayedWriteRate) {
+ this.delayedWriteRate = delayedWriteRate;
+ return this;
+ }
+
+ public Builder setLevel0SlowdownWritesTrigger(int level0SlowdownWritesTrigger) {
+ this.level0SlowdownWritesTrigger = level0SlowdownWritesTrigger;
+ return this;
+ }
+
+ public Builder setLevel0StopWritesTrigger(int level0StopWritesTrigger) {
+ this.level0StopWritesTrigger = level0StopWritesTrigger;
+ return this;
+ }
+
+ public Builder setMaxBackgroundFlushes(int maxBackgroundFlushes) {
+ this.maxBackgroundFlushes = maxBackgroundFlushes;
+ return this;
+ }
+
+ public Builder setMaxBackgroundCompactions(int maxBackgroundCompactions) {
+ this.maxBackgroundCompactions = maxBackgroundCompactions;
+ return this;
+ }
+
+ public Builder setStatDumpSeconds(int statDumpSeconds) {
+ this.statDumpSeconds = statDumpSeconds;
+ return this;
+ }
+
+ public Builder setSyncMillis(long syncMillis) {
+ this.syncMillis = syncMillis;
+ return this;
+ }
+
+ public Builder setSyncWarningNanos(long syncWarningNanos) {
+ this.syncWarningNanos = syncWarningNanos;
+ return this;
+ }
+
+
+ public Builder setAdviseRandomOnOpen(boolean adviseRandomOnOpen) {
+ this.adviseRandomOnOpen = adviseRandomOnOpen;
+ return this;
+ }
+
+ public Builder setCreateMissingColumnFamilies(boolean createMissingColumnFamilies) {
+ this.createMissingColumnFamilies = createMissingColumnFamilies;
+ return this;
+ }
+
+ public Builder setCreateIfMissing(boolean createIfMissing) {
+ this.createIfMissing = createIfMissing;
+ return this;
+ }
+
+ public Builder setUseFsync(boolean useFsync) {
+ this.useFsync = useFsync;
+ return this;
+ }
+
+ public Builder setPeriodicSyncEnabled(boolean periodicSyncEnabled) {
+ this.periodicSyncEnabled = periodicSyncEnabled;
+ return this;
+ }
+
+ long getMaxTotalWalSize() {
+ return writeBufferSize * maxWriteBufferNumber;
+ }
+ }
+}
diff --git a/nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java b/nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java
new file mode 100644
index 0000000000..7b5e68bc80
--- /dev/null
+++ b/nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rocksdb;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestRocksDBMetronome {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final byte[] KEY = "key".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] VALUE = "value".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] KEY_2 = "key 2".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] VALUE_2 = "value 2".getBytes(StandardCharsets.UTF_8);
+
+ private ExecutorService executor;
+
+ @Before
+ public void before() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void after() {
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testReadWriteLong() throws Exception {
+ Random random = new Random();
+ byte[] key = new byte[8];
+ for (long i = 0; i < 10; i++) {
+ {
+ RocksDBMetronome.writeLong(i, key);
+ assertEquals(i, RocksDBMetronome.readLong(key));
+ }
+ {
+ long testValue = Long.MIN_VALUE + i;
+ RocksDBMetronome.writeLong(testValue, key);
+ assertEquals(testValue, RocksDBMetronome.readLong(key));
+ }
+ {
+ long testValue = Long.MAX_VALUE - i;
+ RocksDBMetronome.writeLong(testValue, key);
+ assertEquals(testValue, RocksDBMetronome.readLong(key));
+ }
+ {
+ long testValue = random.nextLong();
+ RocksDBMetronome.writeLong(testValue, key);
+ assertEquals(testValue, RocksDBMetronome.readLong(key));
+ }
+ }
+ }
+
+ @Test
+ public void testPutGetDelete() throws Exception {
+
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .build()) {
+ db.initialize();
+
+ assertNull(db.get(KEY));
+
+ // test default (no sync)
+ db.put(KEY, VALUE);
+ assertArrayEquals(VALUE, db.get(KEY));
+ db.delete(KEY);
+ assertNull(db.get(KEY));
+
+ // test with "force sync"
+ db.put(KEY, VALUE, true);
+ assertArrayEquals(VALUE, db.get(KEY));
+ db.delete(KEY, true);
+ assertNull(db.get(KEY));
+ }
+ }
+
+ @Test
+ public void testPutGetConfiguration() throws Exception {
+
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .build()) {
+ db.initialize();
+
+ assertNull(db.getConfiguration(KEY));
+ db.putConfiguration(KEY, VALUE);
+ assertArrayEquals(VALUE, db.getConfiguration(KEY));
+ db.delete(db.getColumnFamilyHandle(RocksDBMetronome.CONFIGURATION_FAMILY), KEY);
+ assertNull(db.getConfiguration(KEY));
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testPutBeforeInit() throws Exception {
+
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .build()) {
+ db.put(KEY, VALUE);
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testPutClosed() throws Exception {
+
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .build()) {
+ db.initialize();
+
+ db.close();
+ db.put(KEY_2, VALUE_2);
+ }
+ }
+
+ @Test
+ public void testColumnFamilies() throws Exception {
+
+ String secondFamilyName = "second family";
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .addColumnFamily(secondFamilyName)
+ .build()) {
+ db.initialize();
+ ColumnFamilyHandle secondFamily = db.getColumnFamilyHandle(secondFamilyName);
+
+ // assert nothing present
+ assertNull(db.get(KEY));
+ assertNull(db.get(KEY_2));
+
+ assertNull(db.get(secondFamily, KEY));
+ assertNull(db.get(secondFamily, KEY_2));
+
+ // add values
+ db.put(KEY, VALUE);
+ db.put(secondFamily, KEY_2, VALUE_2);
+
+ // assert values present in correct family
+ assertArrayEquals(VALUE, db.get(KEY));
+ assertNull(db.get(KEY_2));
+
+ assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2));
+ assertNull(db.get(secondFamily, KEY));
+
+ // delete from the "wrong" family
+ db.delete(KEY_2);
+ db.delete(secondFamily, KEY);
+
+ // assert values *still* present in correct family
+ assertArrayEquals(VALUE, db.get(KEY));
+ assertNull(db.get(KEY_2));
+
+ assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2));
+ assertNull(db.get(secondFamily, KEY));
+
+ // delete from the "right" family
+ db.delete(KEY);
+ db.delete(secondFamily, KEY_2);
+
+ // assert values removed
+ assertNull(db.get(KEY));
+ assertNull(db.get(KEY_2));
+
+ assertNull(db.get(secondFamily, KEY));
+ assertNull(db.get(secondFamily, KEY_2));
+ }
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .build()) {
+ db.initialize();
+
+ db.put(KEY, VALUE);
+ db.put(KEY_2, VALUE_2);
+
+ RocksIterator iterator = db.getIterator();
+ iterator.seekToFirst();
+
+ Map recovered = new HashMap<>();
+
+ while (iterator.isValid()) {
+ recovered.put(new String(iterator.key(), StandardCharsets.UTF_8), iterator.value());
+ iterator.next();
+ }
+
+ assertEquals(2, recovered.size());
+ assertArrayEquals(VALUE, recovered.get(new String(KEY, StandardCharsets.UTF_8)));
+ assertArrayEquals(VALUE_2, recovered.get(new String(KEY_2, StandardCharsets.UTF_8)));
+ }
+ }
+
+ @Test
+ public void testCounterIncrement() throws Exception {
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
+ .build()) {
+ db.initialize();
+
+ // get initial counter value
+ int counterValue = db.getSyncCounterValue();
+
+ // do the sync (which would normally happen via the db's internal executor)
+ db.doSync();
+
+ // assert counter value incremented
+ assertEquals(counterValue + 1, db.getSyncCounterValue());
+ }
+ }
+
+ @Test(timeout = 10_000)
+ public void testWaitForSync() throws Exception {
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
+ .build()) {
+ db.initialize();
+
+ Future future = executor.submit(() -> {
+ db.waitForSync();
+ return true;
+ });
+
+ // the future should still be blocked waiting for sync to happen
+ assertFalse(future.isDone());
+
+ // give the future time to wake up and complete
+ while (!future.isDone()) {
+ // TESTING NOTE: this is inside a loop to address a minor *testing* race condition where our first doSync() could happen before the future runs,
+ // meaning waitForSync() would be left waiting on another doSync() that never comes...
+
+ // do the sync (which would normally happen via the db's internal executor)
+ db.doSync();
+ Thread.sleep(25);
+ }
+
+ // the future should no longer be blocked
+ assertTrue(future.isDone());
+ }
+ }
+
+ @Test(timeout = 10_000)
+ public void testWaitForSyncWithValue() throws Exception {
+ try (RocksDBMetronome db = new RocksDBMetronome.Builder()
+ .setStoragePath(temporaryFolder.newFolder().toPath())
+ .setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
+ .build()) {
+ db.initialize();
+
+ int syncCounterValue = db.getSyncCounterValue();
+
+ // "wait" for one before current counter value... should not block
+ db.waitForSync(syncCounterValue - 1);
+
+ // wait for current value... should block (because auto-sync isn't happening)
+ assertBlocks(db, syncCounterValue);
+
+ // do the sync (which would normally happen via the db's internal executor)
+ db.doSync();
+
+ // "wait" for initial value... should now not block
+ db.waitForSync(syncCounterValue);
+
+ // wait for current value again... should block (because auto-sync isn't happening)
+ assertBlocks(db, db.getSyncCounterValue());
+ }
+ }
+
+ private void assertBlocks(RocksDBMetronome db, int counterValue) throws InterruptedException, java.util.concurrent.ExecutionException {
+ Future future = getWaitForSyncFuture(db, counterValue);
+
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ fail();
+ } catch (TimeoutException expected) {
+ assertFalse(future.isDone());
+ }
+ future.cancel(true);
+ }
+
+ private Future getWaitForSyncFuture(RocksDBMetronome db, int counterValue) {
+ return executor.submit(() -> {
+ db.waitForSync(counterValue);
+ return true;
+ });
+ }
+}
\ No newline at end of file
diff --git a/nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties b/nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..29dd873ef8
--- /dev/null
+++ b/nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO,console
+log4j.category.org.apache.nifi=DEBUG
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
\ No newline at end of file
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index fec6996a64..d7e8ec3d1a 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -12,7 +12,9 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--->
+-->
+4.0.0org.apache.nifi
@@ -24,22 +26,23 @@
pomnifi-data-provenance-utils
- nifi-flowfile-packagernifi-expression-language
- nifi-logging-utils
- nifi-properties
- nifi-security-utils
- nifi-socket-utils
- nifi-utils
- nifi-json-utils
- nifi-web-utils
- nifi-write-ahead-log
- nifi-site-to-site-client
+ nifi-flowfile-packagernifi-hl7-query-language
- nifi-schema-utils
- nifi-record
- nifi-record-path
+ nifi-json-utils
+ nifi-logging-utilsnifi-metricsnifi-parameter
+ nifi-properties
+ nifi-record
+ nifi-record-path
+ nifi-rocksdb-utils
+ nifi-schema-utils
+ nifi-security-utils
+ nifi-site-to-site-client
+ nifi-socket-utils
+ nifi-utils
+ nifi-web-utils
+ nifi-write-ahead-log
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 4ec4352b9c..514d791818 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -143,6 +143,11 @@
nifi-write-ahead-log1.10.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-rocksdb-utils
+ 1.10.0-SNAPSHOT
+ org.apache.nifinifi-flowfile-repo-serialization
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
new file mode 100644
index 0000000000..4f3dd45de5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
@@ -0,0 +1,1256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.rocksdb.RocksDBMetronome;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * Implements FlowFile Repository using RocksDB as the backing store.
+ *
+ */
+public class RocksDBFlowFileRepository implements FlowFileRepository {
+
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBFlowFileRepository.class);
+
+ private static final String FLOWFILE_PROPERTY_PREFIX = "nifi.flowfile.repository.";
+ private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = FLOWFILE_PROPERTY_PREFIX + "directory";
+
+ private static final byte[] SWAP_LOCATION_SUFFIX_KEY = "swap.location.sufixes".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] SERIALIZATION_ENCODING_KEY = "serial.encoding".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] SERIALIZATION_HEADER_KEY = "serial.header".getBytes(StandardCharsets.UTF_8);
+ static final byte[] REPOSITORY_VERSION_KEY = "repository.version".getBytes(StandardCharsets.UTF_8);
+ static final byte[] VERSION_ONE_BYTES = "1.0".getBytes(StandardCharsets.UTF_8);
+ private static final IllegalStateException NO_NEW_FLOWFILES = new IllegalStateException("Repository is not currently accepting new FlowFiles");
+ private static final Runtime runtime = Runtime.getRuntime();
+ private static final NumberFormat percentFormat = NumberFormat.getPercentInstance();
+
+ /**
+ * Each property is defined by its name in the file and its default value
+ */
+ enum RocksDbProperty {
+ //FlowFileRepo Configuration Parameters
+ SYNC_WARNING_PERIOD("rocksdb.sync.warning.period", "30 seconds"),
+ CLAIM_CLEANUP_PERIOD("rocksdb.claim.cleanup.period", "30 seconds"),
+ DESERIALIZATION_THREADS("rocksdb.deserialization.threads", "16"),
+ DESERIALIZATION_BUFFER_SIZE("rocksdb.deserialization.buffer.size", "1000"),
+ SYNC_PERIOD("rocksdb.sync.period", "10 milliseconds"),
+ ACCEPT_DATA_LOSS("rocksdb.accept.data.loss", "false"),
+ ENABLE_STALL_STOP("rocksdb.enable.stall.stop", "false"),
+ STALL_PERIOD("rocksdb.stall.period", "100 milliseconds"),
+ STALL_FLOWFILE_COUNT("rocksdb.stall.flowfile.count", "800000"),
+ STALL_HEAP_USAGE_PERCENT("rocksdb.stall.heap.usage.percent", "95%"),
+ STOP_FLOWFILE_COUNT("rocksdb.stop.flowfile.count", "1100000"),
+ STOP_HEAP_USAGE_PERCENT("rocksdb.stop.heap.usage.percent", "99.9%"),
+ REMOVE_ORPHANED_FLOWFILES("rocksdb.remove.orphaned.flowfiles.on.startup", "false"),
+ ENABLE_RECOVERY_MODE("rocksdb.enable.recovery.mode", "false"),
+ RECOVERY_MODE_FLOWFILE_LIMIT("rocksdb.recovery.mode.flowfile.count", "5000"),
+
+ //RocksDB Configuration Parameters
+ DB_PARALLEL_THREADS("rocksdb.parallel.threads", "8"),
+ MAX_WRITE_BUFFER_NUMBER("rocksdb.max.write.buffer.number", "4"),
+ WRITE_BUFFER_SIZE("rocksdb.write.buffer.size", "256 MB"),
+ LEVEL_O_SLOWDOWN_WRITES_TRIGGER("rocksdb.level.0.slowdown.writes.trigger", "20"),
+ LEVEL_O_STOP_WRITES_TRIGGER("rocksdb.level.0.stop.writes.trigger", "40"),
+ DELAYED_WRITE_RATE("rocksdb.delayed.write.bytes.per.second", "16 MB"),
+ MAX_BACKGROUND_FLUSHES("rocksdb.max.background.flushes", "1"),
+ MAX_BACKGROUND_COMPACTIONS("rocksdb.max.background.compactions", "1"),
+ MIN_WRITE_BUFFER_NUMBER_TO_MERGE("rocksdb.min.write.buffer.number.to.merge", "1"),
+ STAT_DUMP_PERIOD("rocksdb.stat.dump.period", "600 sec"),
+ ;
+
+ final String propertyName;
+ final String defaultValue;
+
+ RocksDbProperty(String propertyName, String defaultValue) {
+ this.propertyName = FLOWFILE_PROPERTY_PREFIX + propertyName;
+ this.defaultValue = defaultValue;
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @param timeUnit The desired time unit
+ * @return The property Value in the desired units
+ */
+ long getTimeValue(NiFiProperties niFiProperties, TimeUnit timeUnit) {
+ String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
+ long timeValue = 0L;
+ try {
+ timeValue = Math.round(FormatUtils.getPreciseTimeDuration(propertyValue, timeUnit));
+ } catch (IllegalArgumentException e) {
+ this.generateIllegalArgumentException(propertyValue, e);
+ }
+ return timeValue;
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @return The property value as a boolean
+ */
+ boolean getBooleanValue(NiFiProperties niFiProperties) {
+ String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
+ return Boolean.parseBoolean(propertyValue);
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @return The property value as an int
+ */
+ int getIntValue(NiFiProperties niFiProperties) {
+ String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
+ int returnValue = 0;
+ try {
+ returnValue = Integer.parseInt(propertyValue);
+ } catch (NumberFormatException e) {
+ this.generateIllegalArgumentException(propertyValue, e);
+ }
+ return returnValue;
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @return The property value as a number of bytes
+ */
+ long getByteCountValue(NiFiProperties niFiProperties) {
+ long returnValue = 0L;
+ String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
+ try {
+ double writeBufferDouble = DataUnit.parseDataSize(propertyValue, DataUnit.B);
+ returnValue = (long) (writeBufferDouble < Long.MAX_VALUE ? writeBufferDouble : Long.MAX_VALUE);
+ } catch (IllegalArgumentException e) {
+ this.generateIllegalArgumentException(propertyValue, e);
+ }
+ return returnValue;
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @return The property value as a percent
+ */
+ double getPercentValue(NiFiProperties niFiProperties) {
+ String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue).replace('%', ' ');
+ double returnValue = 0.0D;
+ try {
+ returnValue = Double.parseDouble(propertyValue) / 100D;
+ if (returnValue > 1.0D) {
+ this.generateIllegalArgumentException(propertyValue, null);
+ }
+ } catch (NumberFormatException e) {
+ this.generateIllegalArgumentException(propertyValue, e);
+ }
+ return returnValue;
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @return The property value as a long
+ */
+ long getLongValue(NiFiProperties niFiProperties) {
+ String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
+ long returnValue = 0L;
+ try {
+ returnValue = Long.parseLong(propertyValue);
+ } catch (NumberFormatException e) {
+ this.generateIllegalArgumentException(propertyValue, e);
+ }
+ return returnValue;
+ }
+
+ void generateIllegalArgumentException(String badValue, Throwable t) {
+ throw new IllegalArgumentException("The NiFi Property: [" + this.propertyName + "] with value: [" + badValue + "] is not valid", t);
+ }
+
+ }
+
+
+ private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
+ private final int deserializationThreads;
+ private final int deserializationBufferSize;
+ private final long claimCleanupMillis;
+ private final ScheduledExecutorService housekeepingExecutor;
+ private final AtomicReference> claimsAwaitingDestruction = new AtomicReference<>(new ArrayList<>());
+ private final RocksDBMetronome db;
+ private ResourceClaimManager claimManager;
+ private RepositoryRecordSerdeFactory serdeFactory;
+ private SerDe serializer;
+ private String serializationEncodingName;
+ private byte[] serializationHeader;
+
+ private final boolean acceptDataLoss;
+ private final boolean enableStallStop;
+ private final boolean removeOrphanedFlowFiles;
+ private final boolean enableRecoveryMode;
+ private final long recoveryModeFlowFileLimit;
+ private final AtomicReference> recordDeserializer = new AtomicReference<>();
+ private final List recordsToRestore = Collections.synchronizedList(new LinkedList<>());
+
+ private final ReentrantLock stallStopLock = new ReentrantLock();
+ private final AtomicLong inMemoryFlowFiles = new AtomicLong(0L);
+ volatile boolean stallNewFlowFiles = false;
+ volatile boolean stopNewFlowFiles = false;
+ private final long stallMillis;
+ private final long stallCount;
+ private final long stopCount;
+ private final double stallPercentage;
+ private final double stopPercentage;
+
+ private final Set swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
+
+ /**
+ * default no args constructor for service loading only.
+ */
+ public RocksDBFlowFileRepository() {
+ deserializationThreads = 0;
+ deserializationBufferSize = 0;
+ claimCleanupMillis = 0;
+ housekeepingExecutor = null;
+ db = null;
+ acceptDataLoss = false;
+ enableStallStop = false;
+ removeOrphanedFlowFiles = false;
+ stallMillis = 0;
+ stallCount = 0;
+ stopCount = 0;
+ stallPercentage = 0;
+ stopPercentage = 0;
+ enableRecoveryMode = false;
+ recoveryModeFlowFileLimit = 0;
+ }
+
+ public RocksDBFlowFileRepository(final NiFiProperties niFiProperties) {
+ deserializationThreads = RocksDbProperty.DESERIALIZATION_THREADS.getIntValue(niFiProperties);
+ deserializationBufferSize = RocksDbProperty.DESERIALIZATION_BUFFER_SIZE.getIntValue(niFiProperties);
+
+ claimCleanupMillis = RocksDbProperty.CLAIM_CLEANUP_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS);
+ housekeepingExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ acceptDataLoss = RocksDbProperty.ACCEPT_DATA_LOSS.getBooleanValue(niFiProperties);
+ enableStallStop = RocksDbProperty.ENABLE_STALL_STOP.getBooleanValue(niFiProperties);
+
+ removeOrphanedFlowFiles = RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.getBooleanValue(niFiProperties);
+ if (removeOrphanedFlowFiles) {
+ logger.warn("The property \"{}\" is currently set to \"true\". " +
+ "This can potentially lead to data loss, and should only be set if you are absolutely certain it is necessary. " +
+ "Even then, it should be removed as soon as possible.",
+ RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName);
+ }
+ stallMillis = RocksDbProperty.STALL_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS);
+ stallCount = RocksDbProperty.STALL_FLOWFILE_COUNT.getLongValue(niFiProperties);
+ stopCount = RocksDbProperty.STOP_FLOWFILE_COUNT.getLongValue(niFiProperties);
+ stallPercentage = RocksDbProperty.STALL_HEAP_USAGE_PERCENT.getPercentValue(niFiProperties);
+ stopPercentage = RocksDbProperty.STOP_HEAP_USAGE_PERCENT.getPercentValue(niFiProperties);
+
+ enableRecoveryMode = RocksDbProperty.ENABLE_RECOVERY_MODE.getBooleanValue(niFiProperties);
+ recoveryModeFlowFileLimit = RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.getLongValue(niFiProperties);
+ if (enableRecoveryMode) {
+ logger.warn("The property \"{}\" is currently set to \"true\" and \"{}\" is set to \"{}\". " +
+ "This means that only {} FlowFiles will be loaded in to memory from the FlowFile repo at a time, " +
+ "allowing for recovery of a system encountering OutOfMemory errors (or similar). " +
+ "This setting should be reset to \"false\" as soon as recovery is complete.",
+ RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, recoveryModeFlowFileLimit, recoveryModeFlowFileLimit);
+ }
+ db = new RocksDBMetronome.Builder()
+ .setStatDumpSeconds((int) (Math.min(RocksDbProperty.STAT_DUMP_PERIOD.getTimeValue(niFiProperties, TimeUnit.SECONDS), Integer.MAX_VALUE)))
+ .setParallelThreads(RocksDbProperty.DB_PARALLEL_THREADS.getIntValue(niFiProperties))
+ .setMaxWriteBufferNumber(RocksDbProperty.MAX_WRITE_BUFFER_NUMBER.getIntValue(niFiProperties))
+ .setMinWriteBufferNumberToMerge(RocksDbProperty.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.getIntValue(niFiProperties))
+ .setWriteBufferSize(RocksDbProperty.WRITE_BUFFER_SIZE.getByteCountValue(niFiProperties))
+ .setDelayedWriteRate(RocksDbProperty.DELAYED_WRITE_RATE.getByteCountValue(niFiProperties))
+ .setLevel0SlowdownWritesTrigger(RocksDbProperty.LEVEL_O_SLOWDOWN_WRITES_TRIGGER.getIntValue(niFiProperties))
+ .setLevel0StopWritesTrigger(RocksDbProperty.LEVEL_O_STOP_WRITES_TRIGGER.getIntValue(niFiProperties))
+ .setMaxBackgroundFlushes(RocksDbProperty.MAX_BACKGROUND_FLUSHES.getIntValue(niFiProperties))
+ .setMaxBackgroundCompactions(RocksDbProperty.MAX_BACKGROUND_COMPACTIONS.getIntValue(niFiProperties))
+ .setSyncMillis(RocksDbProperty.SYNC_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS))
+ .setSyncWarningNanos(RocksDbProperty.SYNC_WARNING_PERIOD.getTimeValue(niFiProperties, TimeUnit.NANOSECONDS))
+ .setStoragePath(getFlowFileRepoPath(niFiProperties))
+ .setAdviseRandomOnOpen(false)
+ .setCreateMissingColumnFamilies(true)
+ .setCreateIfMissing(true)
+ .setPeriodicSyncEnabled(!acceptDataLoss)
+ .build();
+ }
+
+ /**
+ * @param niFiProperties The Properties file
+ * @return The path of the repo
+ */
+ static Path getFlowFileRepoPath(NiFiProperties niFiProperties) {
+ for (final String propertyName : niFiProperties.getPropertyKeys()) {
+ if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) {
+ final String dirName = niFiProperties.getProperty(propertyName);
+ return Paths.get(dirName);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return The name of the File Store
+ */
+ @Override
+ public String getFileStoreName() {
+ try {
+ return Files.getFileStore(db.getStoragePath()).name();
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void initialize(final ResourceClaimManager claimManager) throws IOException {
+ this.db.initialize();
+ this.claimManager = claimManager;
+ this.serdeFactory = new StandardRepositoryRecordSerdeFactory(claimManager);
+
+ try {
+ byte[] versionBytes = db.getConfiguration(REPOSITORY_VERSION_KEY);
+ if (versionBytes == null) {
+ db.putConfiguration(REPOSITORY_VERSION_KEY, VERSION_ONE_BYTES);
+ } else if (!Arrays.equals(versionBytes, VERSION_ONE_BYTES)) {
+ throw new IllegalStateException("Unknown repository version: " + new String(versionBytes, StandardCharsets.UTF_8));
+ }
+
+ byte[] serializationEncodingBytes = db.getConfiguration(SERIALIZATION_ENCODING_KEY);
+
+ if (serializationEncodingBytes == null) {
+ serializer = serdeFactory.createSerDe(null);
+ serializationEncodingName = serializer.getClass().getName();
+ db.putConfiguration(SERIALIZATION_ENCODING_KEY, serializationEncodingName.getBytes(StandardCharsets.UTF_8));
+ } else {
+ serializationEncodingName = new String(serializationEncodingBytes, StandardCharsets.UTF_8);
+ serializer = serdeFactory.createSerDe(serializationEncodingName);
+ }
+
+ serializationHeader = db.getConfiguration(SERIALIZATION_HEADER_KEY);
+
+ if (serializationHeader == null) {
+ try (
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)
+ ) {
+ serializer.writeHeader(dataOutputStream);
+ serializationHeader = byteArrayOutputStream.toByteArray();
+ db.putConfiguration(SERIALIZATION_HEADER_KEY, serializationHeader);
+ }
+ }
+
+
+ byte[] swapLocationSuffixBytes = db.getConfiguration(SWAP_LOCATION_SUFFIX_KEY);
+ if (swapLocationSuffixBytes != null) {
+ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(swapLocationSuffixBytes);
+ ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
+ Object o = objectInputStream.readObject();
+ if (o instanceof Collection) {
+ ((Collection>) o).forEach(obj -> swapLocationSuffixes.add(obj.toString()));
+ }
+ }
+ }
+ } catch (RocksDBException | ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ housekeepingExecutor.scheduleWithFixedDelay(this::doHousekeeping, 0, claimCleanupMillis, TimeUnit.MILLISECONDS);
+
+ logger.info("Initialized FlowFile Repository at {}", db.getStoragePath());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (housekeepingExecutor != null) {
+ housekeepingExecutor.shutdownNow();
+ }
+
+ if (db != null) {
+ db.close();
+ }
+ }
+
+ /**
+ * This method is scheduled by the housekeepingExecutor at the specified interval.
+ * Catches Throwable so as not to suppress future executions of the ScheduledExecutorService
+ */
+ private void doHousekeeping() {
+ try {
+ doClaimCleanup();
+ updateStallStop();
+ doRecovery();
+ } catch (Throwable t) {
+ // catching Throwable so as not to suppress subsequent executions
+ logger.error("Encountered problem during housekeeping", t);
+ }
+ }
+
+ /**
+ * Marks as destructible any claims that were held by records which have now been deleted.
+ */
+ private void doClaimCleanup() {
+ Collection claimsToDestroy;
+ synchronized (claimsAwaitingDestruction) {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ claimsToDestroy = claimsAwaitingDestruction.getAndSet(new ArrayList<>());
+ }
+ if (claimsToDestroy != null) {
+ Collection uniqueClaimsToDestroy = new HashSet<>(claimsToDestroy);
+
+ try {
+ if (!acceptDataLoss) {
+ db.waitForSync();
+ } else {
+ db.forceSync();
+ }
+ } catch (InterruptedException | RocksDBException e) {
+ synchronized (claimsAwaitingDestruction) {
+ // if there was an exception, put back the claims we were attempting to destroy
+ claimsAwaitingDestruction.get().addAll(uniqueClaimsToDestroy);
+ return;
+ }
+ }
+ for (final ResourceClaim claim : uniqueClaimsToDestroy) {
+ claimManager.markDestructable(claim);
+ }
+ }
+ }
+
+ /**
+ * Updates the stalled and stopped status of the repository
+ */
+ void updateStallStop() {
+ // if stall.stop logic is not enabled, return
+ if (!enableStallStop) return;
+
+ if (stallStopLock.tryLock()) {
+ try {
+ final long inMemoryFlowFiles = getInMemoryFlowFiles();
+
+ if (inMemoryFlowFiles >= stopCount) {
+ stopNewFlowFiles = true;
+ stallNewFlowFiles = true;
+ logger.warn("Halting new FlowFiles because maximum FlowFile count ({}) has been exceeded. Current count: {}",
+ new Object[]{stopCount, inMemoryFlowFiles});
+ return;
+ }
+
+ // calculate usage percentage
+ final double freeMemory = runtime.freeMemory();
+ final double maxMemory = runtime.maxMemory();
+ final double usedPercentage = 1.0d - (freeMemory / maxMemory);
+
+ if (usedPercentage >= stopPercentage) {
+ stopNewFlowFiles = true;
+ stallNewFlowFiles = true;
+ logger.warn("Halting new FlowFiles because maximum heap usage percentage ({}) has been exceeded. Current usage: {}",
+ new Object[]{percentFormat.format(stopPercentage), percentFormat.format(usedPercentage)});
+ return;
+ }
+
+ if (inMemoryFlowFiles >= stallCount) {
+ stopNewFlowFiles = false;
+ stallNewFlowFiles = true;
+ logger.warn("Stalling new FlowFiles because FlowFile count stall threshold ({}) has been exceeded. Current count: {}",
+ new Object[]{stallCount, inMemoryFlowFiles});
+ return;
+ }
+
+ if (usedPercentage >= stallPercentage) {
+ stopNewFlowFiles = false;
+ stallNewFlowFiles = true;
+ logger.warn("Stalling new FlowFiles because heap usage percentage threshold ({}) has been exceeded. Current count: {}",
+ new Object[]{percentFormat.format(stallPercentage), percentFormat.format(usedPercentage)});
+ return;
+ }
+
+ if (stopNewFlowFiles || stallNewFlowFiles) {
+ logger.info("Resuming acceptance of new FlowFiles");
+ stopNewFlowFiles = false;
+ stallNewFlowFiles = false;
+ }
+ } finally {
+ stallStopLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * If in recovery mode, restore more FlowFile if under the configured limit
+ */
+ synchronized void doRecovery() {
+
+ // if we are not in recovery mode, return
+ if (!enableRecoveryMode) return;
+
+ SerDe deserializer = recordDeserializer.get();
+ if (deserializer == null) {
+ return; // initial load hasn't completed
+ }
+
+ if (recordsToRestore.isEmpty()) {
+ logger.warn("Recovery has been completed. " +
+ "The property \"{}\" is currently set to \"true\", but should be reset to \"false\" as soon as possible.",
+ RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName);
+ return;
+ }
+
+ logger.warn("The property \"{}\" is currently set to \"true\" and \"{}\" is set to \"{}\". " +
+ "This means that only {} FlowFiles will be loaded into memory from the FlowFile repo at a time, " +
+ "allowing for recovery of a system encountering OutOfMemory errors (or similar). " +
+ "This setting should be reset to \"false\" as soon as recovery is complete. " +
+ "There are {} records remaining to be recovered.",
+ RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName,
+ recoveryModeFlowFileLimit, recoveryModeFlowFileLimit, getRecordsToRestoreCount());
+
+ while (!recordsToRestore.isEmpty() && inMemoryFlowFiles.get() < recoveryModeFlowFileLimit) {
+ try {
+ byte[] key = recordsToRestore.get(0);
+ byte[] recordBytes = db.get(key);
+ if (recordBytes != null) {
+ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(recordBytes);
+ DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
+ RepositoryRecord record = deserializer.deserializeRecord(dataInputStream, deserializer.getVersion());
+ final FlowFileRecord flowFile = record.getCurrent();
+ final FlowFileQueue queue = record.getOriginalQueue();
+ if (queue != null) {
+ queue.put(flowFile);
+ inMemoryFlowFiles.incrementAndGet();
+ }
+ }
+ }
+ recordsToRestore.remove(0);
+ } catch (IOException | RocksDBException e) {
+ logger.warn("Encountered exception during recovery", e);
+ }
+ }
+ }
+
+ long getInMemoryFlowFiles() {
+ return inMemoryFlowFiles.get();
+ }
+
+ long getRecordsToRestoreCount() {
+ return recordsToRestore.size();
+ }
+
+ /**
+ * Updates the FlowFile repository with the given RepositoryRecords
+ *
+ * @param records the records to update the repository with
+ * @throws IOException if update fails or a required sync is interrupted
+ */
+ @Override
+ public void updateRepository(final Collection records) throws IOException {
+
+ // verify records are valid
+ int netIncrease = countAndValidateRecords(records);
+
+ final boolean causeIncrease = netIncrease > 0;
+ if (causeIncrease && stopNewFlowFiles) {
+ updateStallStop();
+ throw NO_NEW_FLOWFILES;
+ }
+
+ //update the db with the new records
+ int syncCounterValue = updateRocksDB(records);
+ inMemoryFlowFiles.addAndGet(netIncrease);
+
+ try {
+ // if we created data, but are at a threshold, delay and allow other data to try to get through
+ if (causeIncrease && (stallNewFlowFiles || stopNewFlowFiles)) {
+ Thread.sleep(stallMillis);
+ updateStallStop();
+ }
+
+ // if we got a record indicating data creation, wait for it to be synced to disk before proceeding
+ if (!acceptDataLoss && syncCounterValue > 0) {
+ db.waitForSync(syncCounterValue);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+
+ // determine any content claims that can be destroyed
+ determineDestructibleClaims(records);
+ }
+
+ /**
+ * Check that each record has the required elements
+ *
+ * @param records to be validated
+ * @return the change in in-memory FlowFile count represented by this Collection
+ */
+ private int countAndValidateRecords(Collection records) {
+ int inMemoryDelta = 0;
+ for (RepositoryRecord record : records) {
+ validateRecord(record);
+ if (record.getType() == RepositoryRecordType.CREATE || record.getType() == RepositoryRecordType.SWAP_IN) {
+ inMemoryDelta++;
+ } else if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.SWAP_OUT) {
+ inMemoryDelta--;
+ }
+ }
+ return inMemoryDelta;
+ }
+
+ /**
+ * Check that a record has the required elements
+ *
+ * @param record to be validated
+ */
+ private void validateRecord(RepositoryRecord record) {
+ if (record.getType() != RepositoryRecordType.DELETE
+ && record.getType() != RepositoryRecordType.CONTENTMISSING
+ && record.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS
+ && record.getType() != RepositoryRecordType.SWAP_OUT
+ && record.getDestination() == null) {
+ throw new IllegalArgumentException("Record " + record + " has no destination and Type is " + record.getType());
+ }
+ }
+
+ /**
+ * Update the records in the RocksDB database
+ *
+ * @param records to be persisted
+ * @return the value of the sync counter immediately after the last update which requires a sync
+ */
+ private int updateRocksDB(Collection records) throws IOException {
+
+ // Partition records by UpdateType.
+ // We do this because we want to ensure that records creating data are persisted first.
+ // Additionally, remove records of type 'CLEANUP_TRANSIENT_CLAIMS' so they aren't sent to the db
+
+ final Map> partitionedRecords = new HashMap<>();
+ for (RepositoryRecord repositoryRecord : records) {
+ if (repositoryRecord.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS) {
+ continue;
+ }
+ final UpdateType updateType = serdeFactory.getUpdateType(repositoryRecord);
+ partitionedRecords.computeIfAbsent(updateType, ut -> new ArrayList<>()).add(repositoryRecord);
+ }
+
+ int counterValue;
+
+ try {
+ // handle CREATE records
+ putAll(partitionedRecords.get(UpdateType.CREATE));
+
+ // handle SWAP_OUT records
+ List swapOutRecords = partitionedRecords.get(UpdateType.SWAP_OUT);
+ if (swapOutRecords != null) {
+ for (final RepositoryRecord record : swapOutRecords) {
+ final String newLocation = serdeFactory.getLocation(record);
+ if (newLocation == null) {
+ final Long recordIdentifier = serdeFactory.getRecordIdentifier(record);
+ logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " +
+ "no indicator of where the Record is to be Swapped Out to; these records may be " +
+ "lost when the repository is restored!");
+ } else {
+ delete(record);
+ }
+ }
+ }
+
+ // handle SWAP_IN records
+ List swapInRecords = partitionedRecords.get(UpdateType.SWAP_IN);
+ if (swapInRecords != null) {
+ for (final RepositoryRecord record : swapInRecords) {
+ final String newLocation = serdeFactory.getLocation(record);
+ if (newLocation == null) {
+ final Long recordIdentifier = serdeFactory.getRecordIdentifier(record);
+ logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but " +
+ "no indicator of where the Record is to be Swapped In from; these records may be " +
+ "duplicated when the repository is restored!");
+ }
+ put(record);
+ }
+ }
+
+ // if a sync is required, get the current value of the sync counter
+ counterValue = syncRequired(partitionedRecords) ? db.getSyncCounterValue() : -1;
+
+ // handle UPDATE records
+ putAll(partitionedRecords.get(UpdateType.UPDATE));
+
+ // handle DELETE records
+ deleteAll(partitionedRecords.get(UpdateType.DELETE));
+
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+
+ return counterValue;
+ }
+
+ private boolean syncRequired(Map> recordMap) {
+ for (UpdateType updateType : recordMap.keySet()) {
+ if (updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_OUT || updateType == UpdateType.SWAP_IN) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void deleteAll(List repositoryRecords) throws RocksDBException {
+ if (repositoryRecords != null) {
+ for (final RepositoryRecord record : repositoryRecords) {
+ delete(record);
+ }
+ }
+ }
+
+ private void delete(RepositoryRecord record) throws RocksDBException {
+ final Long recordIdentifier = serdeFactory.getRecordIdentifier(record);
+ byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
+ db.delete(key);
+ }
+
+ private void putAll(List repositoryRecords) throws IOException, RocksDBException {
+ if (repositoryRecords != null) {
+ for (final RepositoryRecord record : repositoryRecords) {
+ put(record);
+ }
+ }
+ }
+
+ private void put(RepositoryRecord record) throws IOException, RocksDBException {
+ final Long recordIdentifier = serdeFactory.getRecordIdentifier(record);
+ byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
+ final byte[] serializedRecord = serialize(record);
+ db.put(key, serializedRecord);
+ }
+
+ private byte[] serialize(RepositoryRecord record) throws IOException {
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ serializer.serializeRecord(record, dataOutputStream);
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ private void determineDestructibleClaims(Collection records) throws IOException {
+
+ final Set claimsToAdd = new HashSet<>();
+ final Set swapLocationsAdded = new HashSet<>();
+ final Set swapLocationsRemoved = new HashSet<>();
+ for (final RepositoryRecord record : records) {
+ updateClaimCounts(record);
+
+ if (record.getType() == RepositoryRecordType.DELETE) {
+ // For any DELETE record that we have, if claim is destructible, mark it so
+ if (isDestructible(record.getCurrentClaim())) {
+ claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
+ }
+
+ // If the original claim is different than the current claim and the original claim is destructible, mark it so
+ if (shouldDestroyOriginal(record)) {
+ claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
+ }
+ } else if (record.getType() == RepositoryRecordType.UPDATE) {
+ // if we have an update, and the original is no longer needed, mark original as destructible
+ if (shouldDestroyOriginal(record)) {
+ claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
+ }
+ } else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
+ final String swapLocation = record.getSwapLocation();
+ final String normalizedSwapLocation = normalizeSwapLocation(swapLocation);
+ swapLocationsAdded.add(normalizedSwapLocation);
+ swapLocationsRemoved.remove(normalizedSwapLocation);
+ } else if (record.getType() == RepositoryRecordType.SWAP_IN) {
+ final String swapLocation = record.getSwapLocation();
+ final String normalizedSwapLocation = normalizeSwapLocation(swapLocation);
+ swapLocationsRemoved.add(normalizedSwapLocation);
+ swapLocationsAdded.remove(normalizedSwapLocation);
+ }
+
+ final List transientClaims = record.getTransientClaims();
+ if (transientClaims != null) {
+ for (final ContentClaim transientClaim : transientClaims) {
+ if (isDestructible(transientClaim)) {
+ claimsToAdd.add(transientClaim.getResourceClaim());
+ }
+ }
+ }
+ }
+
+ // If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes.
+ if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
+ synchronized (swapLocationSuffixes) {
+ removeNormalizedSwapLocations(swapLocationsRemoved);
+ addNormalizedSwapLocations(swapLocationsAdded);
+ }
+ }
+
+ // add any new claims that can be destroyed to the list
+ if (!claimsToAdd.isEmpty()) {
+ synchronized (claimsAwaitingDestruction) {
+ claimsAwaitingDestruction.get().addAll(claimsToAdd);
+ }
+ }
+ }
+
+
+ private void updateClaimCounts(final RepositoryRecord record) {
+ final ContentClaim currentClaim = record.getCurrentClaim();
+ final ContentClaim originalClaim = record.getOriginalClaim();
+ final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
+
+ if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
+ decrementClaimCount(currentClaim);
+ }
+
+ if (claimChanged) {
+ // records which have been updated - remove original if exists
+ decrementClaimCount(originalClaim);
+ }
+ }
+
+ private void decrementClaimCount(final ContentClaim claim) {
+ if (claim == null) {
+ return;
+ }
+
+ claimManager.decrementClaimantCount(claim.getResourceClaim());
+ }
+
+
+ /**
+ * @param claim to be evaluated
+ * @return true if the claim can be destroyed
+ */
+ private boolean isDestructible(final ContentClaim claim) {
+ if (claim == null) {
+ return false;
+ }
+
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ if (resourceClaim == null) {
+ return false;
+ }
+
+ return !resourceClaim.isInUse();
+ }
+
+ /**
+ * @param record to be evaluated
+ * @return true if the original claim can be destroyed
+ */
+ private boolean shouldDestroyOriginal(RepositoryRecord record) {
+ final ContentClaim originalClaim = record.getOriginalClaim();
+ return isDestructible(originalClaim) && !originalClaim.equals(record.getCurrentClaim());
+ }
+
+ @Override
+ public boolean isVolatile() {
+ return false;
+ }
+
+ @Override
+ public long getStorageCapacity() throws IOException {
+ return db.getStorageCapacity();
+ }
+
+ @Override
+ public long getUsableStorageSpace() throws IOException {
+ return db.getUsableStorageSpace();
+ }
+
+
+ @Override
+ public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+ String normalizedSwapLocation = normalizeSwapLocation(swapLocationSuffix);
+ synchronized (swapLocationSuffixes) {
+ return swapLocationSuffixes.contains(normalizedSwapLocation);
+ }
+ }
+
+
+ static String normalizeSwapLocation(final String swapLocation) {
+ if (swapLocation == null) {
+ return null;
+ }
+
+ final String normalizedPath = swapLocation.replace("\\", "/");
+ final String withoutTrailing = (normalizedPath.endsWith("/") && normalizedPath.length() > 1) ? normalizedPath.substring(0, normalizedPath.length() - 1) : normalizedPath;
+ final String pathRemoved = getLocationSuffix(withoutTrailing);
+
+ return StringUtils.substringBefore(pathRemoved, ".");
+ }
+
+ private static String getLocationSuffix(final String swapLocation) {
+ final int lastIndex = swapLocation.lastIndexOf("/");
+ if (lastIndex < 0 || lastIndex >= swapLocation.length() - 1) {
+ return swapLocation;
+ }
+
+ return swapLocation.substring(lastIndex + 1);
+ }
+
+ @Override
+ public void swapFlowFilesOut(final List swappedOut, final FlowFileQueue queue,
+ final String swapLocation) throws IOException {
+ final List repoRecords = new ArrayList<>();
+ if (swappedOut == null || swappedOut.isEmpty()) {
+ return;
+ }
+
+ for (final FlowFileRecord swapRecord : swappedOut) {
+ final RepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord, swapLocation);
+ repoRecords.add(repoRecord);
+ }
+
+ updateRepository(repoRecords);
+ addRawSwapLocation(swapLocation);
+ logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", swappedOut.size(), queue, swapLocation);
+ }
+
+ @Override
+ public void swapFlowFilesIn(final String swapLocation, final List swapRecords,
+ final FlowFileQueue queue) throws IOException {
+ final List repoRecords = new ArrayList<>();
+
+ for (final FlowFileRecord swapRecord : swapRecords) {
+ final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord);
+ repoRecord.setSwapLocation(swapLocation); // set the swap file to indicate that it's being swapped in.
+ repoRecord.setDestination(queue);
+
+ repoRecords.add(repoRecord);
+ }
+
+ updateRepository(repoRecords);
+ removeRawSwapLocation(swapLocation);
+ logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
+ }
+
+
+ @Override
+ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
+
+ final long startTime = System.nanoTime();
+
+ final Map queueMap = new HashMap<>();
+ for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
+ queueMap.put(queue.getIdentifier(), queue);
+ }
+
+ final ExecutorService recordDeserializationExecutor = Executors.newFixedThreadPool(deserializationThreads, r -> {
+ Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ // Create a queue which will hold the bytes of the records that have been read from disk and are awaiting deserialization
+ final BlockingQueue recordBytesQueue = new ArrayBlockingQueue<>(deserializationBufferSize);
+
+ final AtomicBoolean doneReading = new AtomicBoolean(false);
+ final List> futures = new ArrayList<>(deserializationThreads);
+
+ RepositoryRecordSerdeFactory factory = new StandardRepositoryRecordSerdeFactory(claimManager);
+ factory.setQueueMap(queueMap);
+
+ final AtomicInteger numFlowFilesMissingQueue = new AtomicInteger(0);
+ final AtomicInteger recordCount = new AtomicInteger(0);
+ final AtomicInteger recoveryModeRecordCount = new AtomicInteger(0);
+
+ for (int i = 0; i < deserializationThreads; i++) {
+ futures.add(recordDeserializationExecutor.submit(() -> {
+ long localMaxId = 0;
+ int localRecordCount = 0;
+ final Set localRecoveredSwapLocations = new HashSet<>();
+
+ // Create deserializer in each thread
+ factory.setQueueMap(queueMap);
+ final SerDe localDeserializer = factory.createSerDe(serializationEncodingName);
+ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializationHeader);
+ DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
+ localDeserializer.readHeader(dataInputStream);
+ }
+
+ while (!doneReading.get() || !recordBytesQueue.isEmpty()) {
+ byte[] value = recordBytesQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (value != null) {
+ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(value);
+ DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
+ RepositoryRecord record = localDeserializer.deserializeRecord(dataInputStream, localDeserializer.getVersion());
+
+ localRecordCount++;
+
+ // increment the count for the record
+ final ContentClaim claim = record.getCurrentClaim();
+ if (claim != null) {
+ claimManager.incrementClaimantCount(claim.getResourceClaim());
+ }
+
+ final long recordId = record.getCurrent().getId();
+ if (recordId > localMaxId) {
+ localMaxId = recordId;
+ }
+
+ if (record.getType().equals(RepositoryRecordType.SWAP_OUT)) {
+ localRecoveredSwapLocations.add(normalizeSwapLocation(record.getSwapLocation()));
+ }
+
+ final FlowFileRecord flowFile = record.getCurrent();
+ final FlowFileQueue queue = record.getOriginalQueue();
+ if (queue == null) {
+ if (!removeOrphanedFlowFiles) {
+ throw new IOException("Found FlowFile in repository without a corresponding queue. " +
+ "This may indicate an issue syncing the flow.xml in a cluster. " +
+ "To resolve this issue you should restore the flow.xml. " +
+ "Alternatively, if removing data is acceptable, you can add the following to nifi.properties: \n\n" +
+ "\t\t" + RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName + "=true\n\n" +
+ "...once this has allowed you to restart nifi, you should remove it from nifi.properties to prevent inadvertent future data loss.");
+ }
+
+ numFlowFilesMissingQueue.incrementAndGet();
+ try {
+ final Long recordIdentifier = factory.getRecordIdentifier(record);
+ byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
+ db.delete(key);
+ } catch (RocksDBException e) {
+ logger.warn("Could not clean up repository", e);
+ }
+ } else {
+ // verify we're supposed to enqueue the FlowFile
+ if (!enableRecoveryMode) {
+ queue.put(flowFile);
+ } else if (recoveryModeRecordCount.incrementAndGet() <= recoveryModeFlowFileLimit) {
+ queue.put(flowFile);
+ } else {
+ final Long recordIdentifier = factory.getRecordIdentifier(record);
+ byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
+ recordsToRestore.add(key);
+ }
+ }
+ }
+ }
+ }
+ recordCount.addAndGet(localRecordCount);
+ addNormalizedSwapLocations(localRecoveredSwapLocations);
+ return localMaxId;
+
+ }));
+ }
+
+ long maxId = 0;
+ RocksIterator rocksIterator = db.getIterator();
+ rocksIterator.seekToFirst();
+ long counter = 0;
+ long totalRecords = 0;
+ try {
+ while (rocksIterator.isValid()) {
+ if (recordBytesQueue.offer(rocksIterator.value(), 10, TimeUnit.SECONDS)) {
+ rocksIterator.next();
+ if (++counter == 5_000) { // periodically report progress
+ totalRecords += counter;
+ counter = 0;
+ logger.info("Read {} records from disk", totalRecords);
+ }
+ } else {
+ // couldn't add to the queue in a timely fashion... make sure there are no exceptions from the consumers
+ for (Future f : futures) {
+ // the only way it could be done at this point is through an exception
+ // (because we haven't yet set doneReading = true)
+ if (f.isDone()) {
+ f.get(); // this will throw the exception
+ }
+ }
+ logger.warn("Failed to add record bytes to queue. Will keep trying...");
+ }
+ }
+
+ doneReading.set(true);
+ totalRecords += counter;
+ logger.info("Finished reading from rocksDB. Read {} records from disk", totalRecords);
+
+ for (Future f : futures) {
+ long futureMax = f.get(); // will wait for completion (or exception)
+ if (futureMax > maxId) {
+ maxId = futureMax;
+ }
+ }
+
+ logger.info("Finished deserializing {} records", recordCount.get());
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ recordDeserializationExecutor.shutdownNow();
+ }
+
+ // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
+ // return the appropriate number.
+ flowFileSequenceGenerator.set(maxId + 1);
+
+ int flowFilesInQueues = recordCount.get() - numFlowFilesMissingQueue.get();
+ inMemoryFlowFiles.set(!enableRecoveryMode ? flowFilesInQueues
+ : Math.min(flowFilesInQueues, recoveryModeFlowFileLimit));
+ logger.info("Successfully restored {} FlowFiles in {} milliseconds using {} threads",
+ getInMemoryFlowFiles(),
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime),
+ deserializationThreads);
+ if (logger.isDebugEnabled()) {
+ synchronized (this.swapLocationSuffixes) {
+ logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes);
+ }
+ }
+ if (numFlowFilesMissingQueue.get() > 0) {
+ logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles have been dropped.", numFlowFilesMissingQueue);
+ }
+
+ final SerDe deserializer = factory.createSerDe(serializationEncodingName);
+ factory.setQueueMap(null); // clear the map
+
+ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializationHeader);
+ DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
+ deserializer.readHeader(dataInputStream);
+ }
+
+ if (enableRecoveryMode) {
+ recordDeserializer.set(deserializer);
+ }
+
+ return maxId;
+ }
+
+
+ private void addRawSwapLocation(String rawSwapLocation) throws IOException {
+ addRawSwapLocations(Collections.singleton(rawSwapLocation));
+ }
+
+ private void addRawSwapLocations(Collection rawSwapLocations) throws IOException {
+ addNormalizedSwapLocations(rawSwapLocations.stream().map(RocksDBFlowFileRepository::normalizeSwapLocation).collect(Collectors.toSet()));
+ }
+
+ private void addNormalizedSwapLocations(Collection normalizedSwapLocations) throws IOException {
+ synchronized (this.swapLocationSuffixes) {
+ this.swapLocationSuffixes.addAll(normalizedSwapLocations);
+ persistSwapLocationSuffixes();
+ }
+ }
+
+ private void removeRawSwapLocation(String rawSwapLocation) throws IOException {
+ removeRawSwapLocations(Collections.singleton(rawSwapLocation));
+ }
+
+ private void removeRawSwapLocations(Collection rawSwapLocations) throws IOException {
+ removeNormalizedSwapLocations(rawSwapLocations.stream().map(RocksDBFlowFileRepository::normalizeSwapLocation).collect(Collectors.toSet()));
+ }
+
+ private void removeNormalizedSwapLocations(Collection normalizedSwapLocations) throws IOException {
+ synchronized (this.swapLocationSuffixes) {
+ this.swapLocationSuffixes.removeAll(normalizedSwapLocations);
+ persistSwapLocationSuffixes();
+ }
+ }
+
+ private void persistSwapLocationSuffixes() throws IOException {
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
+ ) {
+ objectOutputStream.writeObject(swapLocationSuffixes);
+ db.putConfiguration(SWAP_LOCATION_SUFFIX_KEY, byteArrayOutputStream.toByteArray());
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public void updateMaxFlowFileIdentifier(final long maxId) {
+ while (true) {
+ final long currentId = flowFileSequenceGenerator.get();
+ if (currentId >= maxId) {
+ return;
+ }
+
+ final boolean updated = flowFileSequenceGenerator.compareAndSet(currentId, maxId);
+ if (updated) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public long getNextFlowFileSequence() {
+ return flowFileSequenceGenerator.getAndIncrement();
+ }
+
+ @Override
+ public long getMaxFlowFileIdentifier() {
+ // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong
+ return flowFileSequenceGenerator.get() - 1;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 133751179d..779c29c421 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -515,7 +515,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
updateRepository(repoRecords, true);
synchronized (this.swapLocationSuffixes) {
- this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
+ this.swapLocationSuffixes.remove(normalizeSwapLocation(swapLocation));
}
logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository
index 590dbc1871..88a98c5d37 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository
@@ -1,16 +1,17 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
-org.apache.nifi.controller.repository.VolatileFlowFileRepository
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.controller.repository.RocksDBFlowFileRepository
+org.apache.nifi.controller.repository.VolatileFlowFileRepository
+org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
new file mode 100644
index 0000000000..f2cd84c320
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapContents;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
+import org.apache.nifi.rocksdb.RocksDBMetronome;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+public class TestRocksDBFlowFileRepository {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestRocksDBFlowFileRepository.class);
+
+ private final Map additionalProperties = new HashMap<>();
+ private String nifiPropertiesPath;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws IOException {
+ File testRepoDir = temporaryFolder.newFolder(testName.getMethodName());
+ additionalProperties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, testRepoDir.getAbsolutePath());
+
+ File properties = temporaryFolder.newFile();
+ Files.copy(Paths.get("src/test/resources/conf/nifi.properties"), properties.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ nifiPropertiesPath = properties.getAbsolutePath();
+
+ logger.info("Running test: {}", testName.getMethodName());
+ }
+
+ @Test
+ public void testNormalizeSwapLocation() {
+ assertEquals("/", RocksDBFlowFileRepository.normalizeSwapLocation("/"));
+ assertEquals("", RocksDBFlowFileRepository.normalizeSwapLocation(""));
+ assertNull(RocksDBFlowFileRepository.normalizeSwapLocation(null));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt"));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/test.txt"));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/tmp/test.txt"));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("//test.txt"));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt"));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt/"));
+ assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"));
+ assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation(WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/")));
+ }
+
+ @Test
+ public void testSwapLocationsRestored() throws IOException {
+
+ final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
+ repo.initialize(new StandardResourceClaimManager());
+
+ final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
+ repo.loadFlowFiles(queueProvider);
+
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+
+ queueProvider.addConnection(connection);
+
+ StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ ffBuilder.id(1L);
+ ffBuilder.size(0L);
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+ final List records = new ArrayList<>();
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123");
+ record.setDestination(queue);
+ records.add(record);
+
+ repo.updateRepository(records);
+ repo.close();
+
+ // restore
+ final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(queueProvider);
+ assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
+ assertFalse(repo2.isValidSwapLocationSuffix("other"));
+ repo2.close();
+ }
+
+ @Test
+ public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
+ final Path path = Paths.get("target/test-swap-repo");
+ if (Files.exists(path)) {
+ FileUtils.deleteFile(path.toFile(), true);
+ }
+
+ final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
+ repo.initialize(new StandardResourceClaimManager());
+
+ final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
+ repo.loadFlowFiles(queueProvider);
+
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+
+ queueProvider.addConnection(connection);
+
+ StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ ffBuilder.id(1L);
+ ffBuilder.size(0L);
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+ final List records = new ArrayList<>();
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
+ record.setDestination(queue);
+ records.add(record);
+
+ assertFalse(repo.isValidSwapLocationSuffix("swap123"));
+ repo.updateRepository(records);
+ assertTrue(repo.isValidSwapLocationSuffix("swap123"));
+ repo.close();
+ }
+
+ @Test
+ public void testResourceClaimsIncremented() throws IOException {
+ final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+
+ final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+ when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+ final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
+ final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
+
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+
+ final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
+ final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
+
+ final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
+ final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
+
+ // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
+ // indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
+ // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
+ try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
+ repo.initialize(claimManager);
+ repo.loadFlowFiles(queueProvider);
+
+ // Create a Repository Record that indicates that a FlowFile was created
+ final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+ .contentClaim(claim1)
+ .build();
+ final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
+ rec1.setWorking(flowFile1);
+ rec1.setDestination(queue);
+
+ // Create a Record that we can swap out
+ final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
+ .id(2L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
+ .contentClaim(claim2)
+ .build();
+
+ final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
+ rec2.setWorking(flowFile2);
+ rec2.setDestination(queue);
+
+ final List records = new ArrayList<>();
+ records.add(rec1);
+ records.add(rec2);
+ repo.updateRepository(records);
+
+ final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null);
+ repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
+ }
+
+ final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
+ try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
+ repo.initialize(recoveryClaimManager);
+ final long largestId = repo.loadFlowFiles(queueProvider);
+
+ // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
+ assertEquals(1, largestId);
+ }
+
+ // resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
+ // because resource claim 2 is referenced only by flowfiles that are swapped out.
+ assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
+ assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
+
+ final SwapSummary summary = queue.recoverSwappedFlowFiles();
+ assertNotNull(summary);
+ assertEquals(2, summary.getMaxFlowFileId().intValue());
+ assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
+
+ final List swappedOutClaims = summary.getResourceClaims();
+ assertNotNull(swappedOutClaims);
+ assertEquals(1, swappedOutClaims.size());
+ assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
+ }
+
+ @Test
+ public void testRestartWithOneRecord() throws IOException {
+
+ final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
+ repo.initialize(new StandardResourceClaimManager());
+
+ final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
+ repo.loadFlowFiles(queueProvider);
+
+ final List flowFileCollection = new ArrayList<>();
+
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ doAnswer((Answer