From 7d77b464ccfbd86ff1f2057c44bc35580d7f9fe2 Mon Sep 17 00:00:00 2001 From: Brandon Devries Date: Thu, 16 Aug 2018 15:52:03 -0400 Subject: [PATCH] NIFI-4775: FlowFile Repository implementation based on RocksDB +l from markobean. This closes #3638. Signed-off-by: Brandon --- nifi-commons/nifi-rocksdb-utils/pom.xml | 44 + .../apache/nifi/rocksdb/RocksDBMetronome.java | 891 ++++++++++++ .../nifi/rocksdb/TestRocksDBMetronome.java | 331 +++++ .../src/test/resources/log4j.properties | 22 + nifi-commons/pom.xml | 31 +- .../nifi-framework-core/pom.xml | 5 + .../repository/RocksDBFlowFileRepository.java | 1256 +++++++++++++++++ .../WriteAheadFlowFileRepository.java | 2 +- ...i.controller.repository.FlowFileRepository | 33 +- .../TestRocksDBFlowFileRepository.java | 807 +++++++++++ .../src/test/resources/log4j.properties | 22 + 11 files changed, 3413 insertions(+), 31 deletions(-) create mode 100644 nifi-commons/nifi-rocksdb-utils/pom.xml create mode 100644 nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java create mode 100644 nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java create mode 100644 nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/log4j.properties diff --git a/nifi-commons/nifi-rocksdb-utils/pom.xml b/nifi-commons/nifi-rocksdb-utils/pom.xml new file mode 100644 index 0000000000..02ece68879 --- /dev/null +++ b/nifi-commons/nifi-rocksdb-utils/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-commons + 1.10.0-SNAPSHOT + + nifi-rocksdb-utils + 1.10.0-SNAPSHOT + jar + + + org.apache.nifi + nifi-api + 1.10.0-SNAPSHOT + + + org.apache.nifi + nifi-properties + 1.10.0-SNAPSHOT + + + org.rocksdb + rocksdbjni + 6.0.1 + + + diff --git a/nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java b/nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java new file mode 100644 index 0000000000..81d4adb0ab --- /dev/null +++ b/nifi-commons/nifi-rocksdb-utils/src/main/java/org/apache/nifi/rocksdb/RocksDBMetronome.java @@ -0,0 +1,891 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rocksdb; + +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.util.StringUtils; +import org.rocksdb.AccessHint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * This RocksDB helper class, instead of forcing to disk every time it's given a record, + * persists all waiting records on a regular interval (using a ScheduledExecutorService). + * Like when a metronome ticks. + */ + +public class RocksDBMetronome implements Closeable { + + private static final Logger logger = LoggerFactory.getLogger(RocksDBMetronome.class); + + static final String CONFIGURATION_FAMILY = "configuration.column.family"; + static final String DEFAULT_FAMILY = "default"; + + private final AtomicLong lastSyncWarningNanos = new AtomicLong(0L); + private final int parallelThreads; + private final int maxWriteBufferNumber; + private final int minWriteBufferNumberToMerge; + private final long writeBufferSize; + private final long maxTotalWalSize; + private final long delayedWriteRate; + private final int level0SlowdownWritesTrigger; + private final int level0StopWritesTrigger; + private final int maxBackgroundFlushes; + private final int maxBackgroundCompactions; + private final int statDumpSeconds; + private final long syncMillis; + private final long syncWarningNanos; + private final Path storagePath; + private final boolean adviseRandomOnOpen; + private final boolean createIfMissing; + private final boolean createMissingColumnFamilies; + private final boolean useFsync; + + private final Set columnFamilyNames; + private final Map columnFamilyHandles; + + private final boolean periodicSyncEnabled; + private final ScheduledExecutorService syncExecutor; + private final ReentrantLock syncLock = new ReentrantLock(); + private final Condition syncCondition = syncLock.newCondition(); + private final AtomicInteger syncCounter = new AtomicInteger(0); + + private volatile RocksDB rocksDB = null; + private final ReentrantReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock dbReadLock = dbReadWriteLock.readLock(); + private final ReentrantReadWriteLock.WriteLock dbWriteLock = dbReadWriteLock.writeLock(); + private volatile boolean closed = false; + + private ColumnFamilyHandle configurationColumnFamilyHandle; + private ColumnFamilyHandle defaultColumnFamilyHandle; + private WriteOptions forceSyncWriteOptions; + private WriteOptions noSyncWriteOptions; + + private RocksDBMetronome(Builder builder) { + statDumpSeconds = builder.statDumpSeconds; + parallelThreads = builder.parallelThreads; + maxWriteBufferNumber = builder.maxWriteBufferNumber; + minWriteBufferNumberToMerge = builder.minWriteBufferNumberToMerge; + writeBufferSize = builder.writeBufferSize; + maxTotalWalSize = builder.getMaxTotalWalSize(); + delayedWriteRate = builder.delayedWriteRate; + level0SlowdownWritesTrigger = builder.level0SlowdownWritesTrigger; + level0StopWritesTrigger = builder.level0StopWritesTrigger; + maxBackgroundFlushes = builder.maxBackgroundFlushes; + maxBackgroundCompactions = builder.maxBackgroundCompactions; + syncMillis = builder.syncMillis; + syncWarningNanos = builder.syncWarningNanos; + storagePath = builder.storagePath; + adviseRandomOnOpen = builder.adviseRandomOnOpen; + createIfMissing = builder.createIfMissing; + createMissingColumnFamilies = builder.createMissingColumnFamilies; + useFsync = builder.useFsync; + columnFamilyNames = builder.columnFamilyNames; + columnFamilyHandles = new HashMap<>(columnFamilyNames.size()); + + periodicSyncEnabled = builder.periodicSyncEnabled; + syncExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setDaemon(true); + return thread; + }); + } + + /** + * Initialize the metronome + * + * @throws IOException if there is an issue with the underlying database + */ + public void initialize() throws IOException { + + final String rocksSharedLibDir = System.getenv("ROCKSDB_SHAREDLIB_DIR"); + final String javaTmpDir = System.getProperty("java.io.tmpdir"); + final String libDir = !StringUtils.isBlank(rocksSharedLibDir) ? rocksSharedLibDir : javaTmpDir; + try { + Files.createDirectories(Paths.get(libDir)); + } catch (IOException e) { + throw new IOException("Unable to load the RocksDB shared library into directory: " + libDir, e); + } + + // delete any previous librocksdbjni.so files + final File[] rocksSos = Paths.get(libDir).toFile().listFiles((dir, name) -> name.startsWith("librocksdbjni") && name.endsWith(".so")); + if (rocksSos != null) { + for (File rocksSo : rocksSos) { + if (!rocksSo.delete()) { + logger.warn("Could not delete existing librocksdbjni*.so file {}", rocksSo); + } + } + } + + try { + RocksDB.loadLibrary(); + } catch (Throwable t) { + if (System.getProperty("os.name").startsWith("Windows")) { + logger.error("The RocksDBMetronome will only work on Windows if you have Visual C++ runtime libraries for Visual Studio 2015 installed. " + + "If the DLLs required to support RocksDB cannot be found, then NiFi will not start!"); + } + throw t; + } + + Files.createDirectories(storagePath); + + forceSyncWriteOptions = new WriteOptions() + .setDisableWAL(false) + .setSync(true); + + noSyncWriteOptions = new WriteOptions() + .setDisableWAL(false) + .setSync(false); + + + dbWriteLock.lock(); + try (final DBOptions dbOptions = new DBOptions() + .setAccessHintOnCompactionStart(AccessHint.SEQUENTIAL) + .setAdviseRandomOnOpen(adviseRandomOnOpen) + .setAllowMmapWrites(false) // required to be false for RocksDB.syncWal() to work + .setCreateIfMissing(createIfMissing) + .setCreateMissingColumnFamilies(createMissingColumnFamilies) + .setDelayedWriteRate(delayedWriteRate) + .setIncreaseParallelism(parallelThreads) + .setLogger(getRocksLogger()) + .setMaxBackgroundCompactions(maxBackgroundCompactions) + .setMaxBackgroundFlushes(maxBackgroundFlushes) + .setMaxTotalWalSize(maxTotalWalSize) + .setStatsDumpPeriodSec(statDumpSeconds) + .setUseFsync(useFsync) + ; + + final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions() + .setCompressionType(CompressionType.LZ4_COMPRESSION) + .setLevel0SlowdownWritesTrigger(level0SlowdownWritesTrigger) + .setLevel0StopWritesTrigger(level0StopWritesTrigger) + .setMaxWriteBufferNumber(maxWriteBufferNumber) + .setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge) + .setWriteBufferSize(writeBufferSize) + ) { + + // create family descriptors + List familyDescriptors = new ArrayList<>(columnFamilyNames.size()); + for (byte[] name : columnFamilyNames) { + familyDescriptors.add(new ColumnFamilyDescriptor(name, cfOptions)); + } + + List columnFamilyList = new ArrayList<>(columnFamilyNames.size()); + + rocksDB = RocksDB.open(dbOptions, storagePath.toString(), familyDescriptors, columnFamilyList); + + // create the map of names to handles + columnFamilyHandles.put(DEFAULT_FAMILY, rocksDB.getDefaultColumnFamily()); + for (ColumnFamilyHandle cf : columnFamilyList) { + columnFamilyHandles.put(new String(cf.getName(), StandardCharsets.UTF_8), cf); + } + + // set specific special handles + defaultColumnFamilyHandle = rocksDB.getDefaultColumnFamily(); + configurationColumnFamilyHandle = columnFamilyHandles.get(CONFIGURATION_FAMILY); + + } catch (RocksDBException e) { + throw new IOException(e); + } finally { + dbWriteLock.unlock(); + } + + if (periodicSyncEnabled) { + syncExecutor.scheduleWithFixedDelay(this::doSync, syncMillis, syncMillis, TimeUnit.MILLISECONDS); + } + + logger.info("Initialized RocksDB Repository at {}", storagePath); + } + + + /** + * This method checks the state of the database to ensure it is available for use. + *

+ * NOTE: This *must* be called holding the dbReadLock + * + * @throws IllegalStateException if the database is closed or not yet initialized + */ + private void checkDbState() throws IllegalStateException { + if (rocksDB == null) { + if (closed) { + throw new IllegalStateException("RocksDBMetronome is closed"); + } + throw new IllegalStateException("RocksDBMetronome has not been initialized"); + } + + } + + /** + * Return an iterator over the specified column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it). + *

+ * Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed. + * + * @param columnFamilyHandle specifies the column family for the iterator + * @return an iterator over the specified column family + */ + public RocksIterator getIterator(final ColumnFamilyHandle columnFamilyHandle) { + dbReadLock.lock(); + try { + checkDbState(); + return rocksDB.newIterator(columnFamilyHandle); + } finally { + dbReadLock.unlock(); + } + } + + /** + * Get the value for the provided key in the specified column family + * + * @param columnFamilyHandle the column family from which to get the value + * @param key the key of the value to retrieve + * @return the value for the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public byte[] get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException { + dbReadLock.lock(); + try { + checkDbState(); + return rocksDB.get(columnFamilyHandle, key); + } finally { + dbReadLock.unlock(); + } + } + + /** + * Put the key / value pair into the database in the specified column family + * + * @param columnFamilyHandle the column family in to which to put the value + * @param writeOptions specification of options for write operations + * @param key the key to be inserted + * @param value the value to be associated with the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void put(final ColumnFamilyHandle columnFamilyHandle, WriteOptions writeOptions, final byte[] key, final byte[] value) throws RocksDBException { + dbReadLock.lock(); + try { + checkDbState(); + rocksDB.put(columnFamilyHandle, writeOptions, key, value); + } finally { + dbReadLock.unlock(); + } + } + + /** + * Delete the key / value pair from the specified column family + * + * @param columnFamilyHandle the column family in to which to put the value + * @param writeOptions specification of options for write operations + * @param key the key to be inserted + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final WriteOptions writeOptions) throws RocksDBException { + dbReadLock.lock(); + try { + checkDbState(); + rocksDB.delete(columnFamilyHandle, writeOptions, key); + } finally { + dbReadLock.unlock(); + } + } + + /** + * Flushes the WAL and syncs to disk + * + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void forceSync() throws RocksDBException { + dbReadLock.lock(); + try { + checkDbState(); + rocksDB.syncWal(); + } finally { + dbReadLock.unlock(); + } + } + + /** + * Get the handle for the specified column family + * + * @param familyName the name of the column family + * @return the handle + */ + public ColumnFamilyHandle getColumnFamilyHandle(String familyName) { + return columnFamilyHandles.get(familyName); + } + + /** + * Put the key / value pair into the configuration column family and sync the wal + * + * @param key the key to be inserted + * @param value the value to be associated with the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void putConfiguration(final byte[] key, final byte[] value) throws RocksDBException { + put(configurationColumnFamilyHandle, forceSyncWriteOptions, key, value); + } + + /** + * Put the key / value pair into the database in the specified column family without syncing the wal + * + * @param columnFamilyHandle the column family in to which to put the value + * @param key the key to be inserted + * @param value the value to be associated with the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException { + put(columnFamilyHandle, noSyncWriteOptions, key, value); + } + + /** + * Put the key / value pair into the database in the specified column family, optionally syncing the wal + * + * @param columnFamilyHandle the column family in to which to put the value + * @param key the key to be inserted + * @param value the value to be associated with the specified key + * @param forceSync if true, sync the wal + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException { + put(columnFamilyHandle, getWriteOptions(forceSync), key, value); + } + + /** + * Put the key / value pair into the database in the default column family, optionally syncing the wal + * + * @param key the key to be inserted + * @param value the value to be associated with the specified key + * @param forceSync if true, sync the wal + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void put(final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException { + put(defaultColumnFamilyHandle, getWriteOptions(forceSync), key, value); + } + + /** + * Put the key / value pair into the database in the default column family, without syncing the wal + * + * @param key the key to be inserted + * @param value the value to be associated with the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void put(final byte[] key, final byte[] value) throws RocksDBException { + put(defaultColumnFamilyHandle, noSyncWriteOptions, key, value); + } + + /** + * Get the value for the provided key in the default column family + * + * @param key the key of the value to retrieve + * @return the value for the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public byte[] get(final byte[] key) throws RocksDBException { + return get(defaultColumnFamilyHandle, key); + } + + /** + * Get the value for the provided key in the configuration column family + * + * @param key the key of the value to retrieve + * @return the value for the specified key + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public byte[] getConfiguration(final byte[] key) throws RocksDBException { + return get(configurationColumnFamilyHandle, key); + } + + + /** + * Delete the key / value pair from the default column family without syncing the wal + * + * @param key the key to be inserted + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void delete(byte[] key) throws RocksDBException { + delete(defaultColumnFamilyHandle, key, noSyncWriteOptions); + } + + /** + * Delete the key / value pair from the default column family, optionally syncing the wal + * + * @param key the key to be inserted + * @param forceSync if true, sync the wal + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void delete(byte[] key, final boolean forceSync) throws RocksDBException { + delete(defaultColumnFamilyHandle, key, getWriteOptions(forceSync)); + } + + /** + * Delete the key / value pair from the default column family without syncing the wal + * + * @param columnFamilyHandle the column family in to which to put the value + * @param key the key to be inserted + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void delete(final ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException { + delete(columnFamilyHandle, key, noSyncWriteOptions); + } + + /** + * Delete the key / value pair from the specified column family, optionally syncing the wal + * + * @param columnFamilyHandle the column family in to which to put the value + * @param key the key to be inserted + * @param forceSync if true, sync the wal + * @throws RocksDBException thrown if there is an error in the underlying library. + */ + public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean forceSync) throws RocksDBException { + delete(columnFamilyHandle, key, getWriteOptions(forceSync)); + } + + private WriteOptions getWriteOptions(boolean forceSync) { + return forceSync ? forceSyncWriteOptions : noSyncWriteOptions; + } + + /** + * Return an iterator over the default column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it). + *

+ * Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed. + * + * @return an iterator over the default column family + */ + public RocksIterator getIterator() { + return getIterator(defaultColumnFamilyHandle); + } + + /** + * Get the current value of the sync counter. This can be used with waitForSync() to verify that operations have been synced to disk + * + * @return the current value of the sync counter + */ + public int getSyncCounterValue() { + return syncCounter.get(); + } + + /** + * Close the Metronome and the RocksDB Objects + * + * @throws IOException if there are issues in closing any of the underlying RocksDB objects + */ + @Override + public void close() throws IOException { + + logger.info("Closing RocksDBMetronome"); + + dbWriteLock.lock(); + try { + logger.info("Shutting down RocksDBMetronome sync executor"); + syncExecutor.shutdownNow(); + + try { + logger.info("Pausing RocksDB background work"); + rocksDB.pauseBackgroundWork(); + } catch (RocksDBException e) { + logger.warn("Unable to pause background work before close.", e); + } + + final AtomicReference exceptionReference = new AtomicReference<>(); + + logger.info("Closing RocksDB configurations"); + + safeClose(forceSyncWriteOptions, exceptionReference); + safeClose(noSyncWriteOptions, exceptionReference); + + // close the column family handles first, then the db last + for (ColumnFamilyHandle cfh : columnFamilyHandles.values()) { + safeClose(cfh, exceptionReference); + } + + logger.info("Closing RocksDB database"); + safeClose(rocksDB, exceptionReference); + rocksDB = null; + closed = true; + + if (exceptionReference.get() != null) { + throw new IOException(exceptionReference.get()); + } + } finally { + dbWriteLock.unlock(); + } + } + + /** + * @param autoCloseable An {@link AutoCloseable} to be closed + * @param exceptionReference A reference to contain any encountered {@link Exception} + */ + private void safeClose(final AutoCloseable autoCloseable, final AtomicReference exceptionReference) { + if (autoCloseable != null) { + try { + autoCloseable.close(); + } catch (Exception e) { + exceptionReference.set(e); + } + } + } + + /** + * @return The capacity of the store + * @throws IOException if encountered + */ + public long getStorageCapacity() throws IOException { + return Files.getFileStore(storagePath).getTotalSpace(); + } + + /** + * @return The usable space of the store + * @throws IOException if encountered + */ + public long getUsableStorageSpace() throws IOException { + return Files.getFileStore(storagePath).getUsableSpace(); + } + + /** + * This method is scheduled by the syncExecutor. It runs at the specified interval, forcing the RocksDB WAL to disk. + *

+ * If the sync is successful, it notifies threads that had been waiting for their records to be persisted using + * syncCondition and the syncCounter, which is incremented to indicate success + */ + void doSync() { + syncLock.lock(); + try { + // if we're interrupted, return + if (Thread.currentThread().isInterrupted()) { + return; + } + forceSync(); + syncCounter.incrementAndGet(); // its ok if it rolls over... we're just going to check that the value changed + syncCondition.signalAll(); + } catch (final IllegalArgumentException e) { + logger.error("Unable to sync, likely because the repository is out of space.", e); + } catch (final Throwable t) { + logger.error("Unable to sync", t); + } finally { + syncLock.unlock(); + } + } + + /** + * This method blocks until the next time the WAL is forced to disk, ensuring that all records written before this point have been persisted. + */ + public void waitForSync() throws InterruptedException { + final int counterValue = syncCounter.get(); + waitForSync(counterValue); + } + + /** + * This method blocks until the WAL has been forced to disk, ensuring that all records written before the point specified by the counterValue have been persisted. + * + * @param counterValue The value of the counter at the time of a write we must persist + * @throws InterruptedException if the thread is interrupted + */ + public void waitForSync(final int counterValue) throws InterruptedException { + if (counterValue != syncCounter.get()) { + return; // if the counter has already changed, we don't need to wait (or grab the lock) because the records we're concerned with have already been persisted + } + long waitTimeRemaining = syncWarningNanos; + syncLock.lock(); + try { + while (counterValue == syncCounter.get()) { // wait until the counter changes (indicating sync occurred) + waitTimeRemaining = syncCondition.awaitNanos(waitTimeRemaining); + if (waitTimeRemaining <= 0L) { // this means the wait timed out + + // only log a warning every syncWarningNanos... don't spam the logs + final long now = System.nanoTime(); + final long lastWarning = lastSyncWarningNanos.get(); + if (now - lastWarning > syncWarningNanos + && lastSyncWarningNanos.compareAndSet(lastWarning, now)) { + logger.warn("Failed to sync within {} seconds... system configuration may need to be adjusted", TimeUnit.NANOSECONDS.toSeconds(syncWarningNanos)); + } + + // reset waiting time + waitTimeRemaining = syncWarningNanos; + } + } + } finally { + syncLock.unlock(); + } + } + + /** + * Returns a representation of a long as a byte array + * + * @param value a long to convert + * @return a byte[] representation + */ + public static byte[] getBytes(long value) { + byte[] bytes = new byte[8]; + writeLong(value, bytes); + return bytes; + } + + /** + * Writes a representation of a long to the specified byte array + * + * @param l a long to convert + * @param bytes an array to store the byte representation + */ + public static void writeLong(long l, byte[] bytes) { + bytes[0] = (byte) (l >>> 56); + bytes[1] = (byte) (l >>> 48); + bytes[2] = (byte) (l >>> 40); + bytes[3] = (byte) (l >>> 32); + bytes[4] = (byte) (l >>> 24); + bytes[5] = (byte) (l >>> 16); + bytes[6] = (byte) (l >>> 8); + bytes[7] = (byte) (l); + } + + /** + * Creates a long from it's byte array representation + * @param bytes to convert to a long + * @return a long + * @throws IOException if the given byte array is of the wrong size + */ + public static long readLong(final byte[] bytes) throws IOException { + if (bytes.length != 8) { + throw new IOException("wrong number of bytes to convert to long (must be 8)"); + } + return (((long) (bytes[0]) << 56) + + ((long) (bytes[1] & 255) << 48) + + ((long) (bytes[2] & 255) << 40) + + ((long) (bytes[3] & 255) << 32) + + ((long) (bytes[4] & 255) << 24) + + ((long) (bytes[5] & 255) << 16) + + ((long) (bytes[6] & 255) << 8) + + ((long) (bytes[7] & 255))); + } + + /** + * @return the storage path of the db + */ + public Path getStoragePath() { + return storagePath; + } + + /** + * @return A RocksDB logger capturing all logging output from RocksDB + */ + private org.rocksdb.Logger getRocksLogger() { + try (Options options = new Options() + // make RocksDB give us everything, and we'll decide what we want to log in our wrapper + .setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) { + return new LogWrapper(options); + } + } + + /** + * An Extension of org.rocksdb.Logger that wraps the slf4j Logger + */ + private class LogWrapper extends org.rocksdb.Logger { + + LogWrapper(Options options) { + super(options); + } + + @Override + protected void log(final InfoLogLevel infoLogLevel, final String logMsg) { + switch (infoLogLevel) { + case ERROR_LEVEL: + case FATAL_LEVEL: + logger.error(logMsg); + break; + case WARN_LEVEL: + logger.warn(logMsg); + break; + case DEBUG_LEVEL: + logger.debug(logMsg); + break; + case INFO_LEVEL: + case HEADER_LEVEL: + default: + logger.info(logMsg); + break; + } + } + } + + public static class Builder { + + int parallelThreads = 8; + int maxWriteBufferNumber = 4; + int minWriteBufferNumberToMerge = 1; + long writeBufferSize = (long) DataUnit.MB.toB(256); + long delayedWriteRate = (long) DataUnit.MB.toB(16); + int level0SlowdownWritesTrigger = 20; + int level0StopWritesTrigger = 40; + int maxBackgroundFlushes = 1; + int maxBackgroundCompactions = 1; + int statDumpSeconds = 600; + long syncMillis = 10; + long syncWarningNanos = TimeUnit.SECONDS.toNanos(30); + Path storagePath; + boolean adviseRandomOnOpen = false; + boolean createIfMissing = true; + boolean createMissingColumnFamilies = true; + boolean useFsync = true; + boolean periodicSyncEnabled = true; + final Set columnFamilyNames = new HashSet<>(); + + public RocksDBMetronome build() { + if (storagePath == null) { + throw new IllegalStateException("Cannot create RocksDBMetronome because storagePath is not set"); + } + + // add default column families + columnFamilyNames.add(RocksDB.DEFAULT_COLUMN_FAMILY); + columnFamilyNames.add(CONFIGURATION_FAMILY.getBytes(StandardCharsets.UTF_8)); + + return new RocksDBMetronome(this); + } + + public Builder addColumnFamily(String name) { + this.columnFamilyNames.add(name.getBytes(StandardCharsets.UTF_8)); + return this; + } + + public Builder setStoragePath(Path storagePath) { + this.storagePath = storagePath; + return this; + } + + public Builder setParallelThreads(int parallelThreads) { + this.parallelThreads = parallelThreads; + return this; + } + + public Builder setMaxWriteBufferNumber(int maxWriteBufferNumber) { + this.maxWriteBufferNumber = maxWriteBufferNumber; + return this; + } + + public Builder setMinWriteBufferNumberToMerge(int minWriteBufferNumberToMerge) { + this.minWriteBufferNumberToMerge = minWriteBufferNumberToMerge; + return this; + } + + public Builder setWriteBufferSize(long writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + public Builder setDelayedWriteRate(long delayedWriteRate) { + this.delayedWriteRate = delayedWriteRate; + return this; + } + + public Builder setLevel0SlowdownWritesTrigger(int level0SlowdownWritesTrigger) { + this.level0SlowdownWritesTrigger = level0SlowdownWritesTrigger; + return this; + } + + public Builder setLevel0StopWritesTrigger(int level0StopWritesTrigger) { + this.level0StopWritesTrigger = level0StopWritesTrigger; + return this; + } + + public Builder setMaxBackgroundFlushes(int maxBackgroundFlushes) { + this.maxBackgroundFlushes = maxBackgroundFlushes; + return this; + } + + public Builder setMaxBackgroundCompactions(int maxBackgroundCompactions) { + this.maxBackgroundCompactions = maxBackgroundCompactions; + return this; + } + + public Builder setStatDumpSeconds(int statDumpSeconds) { + this.statDumpSeconds = statDumpSeconds; + return this; + } + + public Builder setSyncMillis(long syncMillis) { + this.syncMillis = syncMillis; + return this; + } + + public Builder setSyncWarningNanos(long syncWarningNanos) { + this.syncWarningNanos = syncWarningNanos; + return this; + } + + + public Builder setAdviseRandomOnOpen(boolean adviseRandomOnOpen) { + this.adviseRandomOnOpen = adviseRandomOnOpen; + return this; + } + + public Builder setCreateMissingColumnFamilies(boolean createMissingColumnFamilies) { + this.createMissingColumnFamilies = createMissingColumnFamilies; + return this; + } + + public Builder setCreateIfMissing(boolean createIfMissing) { + this.createIfMissing = createIfMissing; + return this; + } + + public Builder setUseFsync(boolean useFsync) { + this.useFsync = useFsync; + return this; + } + + public Builder setPeriodicSyncEnabled(boolean periodicSyncEnabled) { + this.periodicSyncEnabled = periodicSyncEnabled; + return this; + } + + long getMaxTotalWalSize() { + return writeBufferSize * maxWriteBufferNumber; + } + } +} diff --git a/nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java b/nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java new file mode 100644 index 0000000000..7b5e68bc80 --- /dev/null +++ b/nifi-commons/nifi-rocksdb-utils/src/test/java/org/apache/nifi/rocksdb/TestRocksDBMetronome.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.rocksdb; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestRocksDBMetronome { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final byte[] KEY = "key".getBytes(StandardCharsets.UTF_8); + private static final byte[] VALUE = "value".getBytes(StandardCharsets.UTF_8); + private static final byte[] KEY_2 = "key 2".getBytes(StandardCharsets.UTF_8); + private static final byte[] VALUE_2 = "value 2".getBytes(StandardCharsets.UTF_8); + + private ExecutorService executor; + + @Before + public void before() { + executor = Executors.newSingleThreadExecutor(); + } + + @After + public void after() { + executor.shutdownNow(); + } + + @Test + public void testReadWriteLong() throws Exception { + Random random = new Random(); + byte[] key = new byte[8]; + for (long i = 0; i < 10; i++) { + { + RocksDBMetronome.writeLong(i, key); + assertEquals(i, RocksDBMetronome.readLong(key)); + } + { + long testValue = Long.MIN_VALUE + i; + RocksDBMetronome.writeLong(testValue, key); + assertEquals(testValue, RocksDBMetronome.readLong(key)); + } + { + long testValue = Long.MAX_VALUE - i; + RocksDBMetronome.writeLong(testValue, key); + assertEquals(testValue, RocksDBMetronome.readLong(key)); + } + { + long testValue = random.nextLong(); + RocksDBMetronome.writeLong(testValue, key); + assertEquals(testValue, RocksDBMetronome.readLong(key)); + } + } + } + + @Test + public void testPutGetDelete() throws Exception { + + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .build()) { + db.initialize(); + + assertNull(db.get(KEY)); + + // test default (no sync) + db.put(KEY, VALUE); + assertArrayEquals(VALUE, db.get(KEY)); + db.delete(KEY); + assertNull(db.get(KEY)); + + // test with "force sync" + db.put(KEY, VALUE, true); + assertArrayEquals(VALUE, db.get(KEY)); + db.delete(KEY, true); + assertNull(db.get(KEY)); + } + } + + @Test + public void testPutGetConfiguration() throws Exception { + + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .build()) { + db.initialize(); + + assertNull(db.getConfiguration(KEY)); + db.putConfiguration(KEY, VALUE); + assertArrayEquals(VALUE, db.getConfiguration(KEY)); + db.delete(db.getColumnFamilyHandle(RocksDBMetronome.CONFIGURATION_FAMILY), KEY); + assertNull(db.getConfiguration(KEY)); + } + } + + @Test(expected = IllegalStateException.class) + public void testPutBeforeInit() throws Exception { + + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .build()) { + db.put(KEY, VALUE); + } + } + + @Test(expected = IllegalStateException.class) + public void testPutClosed() throws Exception { + + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .build()) { + db.initialize(); + + db.close(); + db.put(KEY_2, VALUE_2); + } + } + + @Test + public void testColumnFamilies() throws Exception { + + String secondFamilyName = "second family"; + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .addColumnFamily(secondFamilyName) + .build()) { + db.initialize(); + ColumnFamilyHandle secondFamily = db.getColumnFamilyHandle(secondFamilyName); + + // assert nothing present + assertNull(db.get(KEY)); + assertNull(db.get(KEY_2)); + + assertNull(db.get(secondFamily, KEY)); + assertNull(db.get(secondFamily, KEY_2)); + + // add values + db.put(KEY, VALUE); + db.put(secondFamily, KEY_2, VALUE_2); + + // assert values present in correct family + assertArrayEquals(VALUE, db.get(KEY)); + assertNull(db.get(KEY_2)); + + assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2)); + assertNull(db.get(secondFamily, KEY)); + + // delete from the "wrong" family + db.delete(KEY_2); + db.delete(secondFamily, KEY); + + // assert values *still* present in correct family + assertArrayEquals(VALUE, db.get(KEY)); + assertNull(db.get(KEY_2)); + + assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2)); + assertNull(db.get(secondFamily, KEY)); + + // delete from the "right" family + db.delete(KEY); + db.delete(secondFamily, KEY_2); + + // assert values removed + assertNull(db.get(KEY)); + assertNull(db.get(KEY_2)); + + assertNull(db.get(secondFamily, KEY)); + assertNull(db.get(secondFamily, KEY_2)); + } + } + + @Test + public void testIterator() throws Exception { + + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .build()) { + db.initialize(); + + db.put(KEY, VALUE); + db.put(KEY_2, VALUE_2); + + RocksIterator iterator = db.getIterator(); + iterator.seekToFirst(); + + Map recovered = new HashMap<>(); + + while (iterator.isValid()) { + recovered.put(new String(iterator.key(), StandardCharsets.UTF_8), iterator.value()); + iterator.next(); + } + + assertEquals(2, recovered.size()); + assertArrayEquals(VALUE, recovered.get(new String(KEY, StandardCharsets.UTF_8))); + assertArrayEquals(VALUE_2, recovered.get(new String(KEY_2, StandardCharsets.UTF_8))); + } + } + + @Test + public void testCounterIncrement() throws Exception { + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync + .build()) { + db.initialize(); + + // get initial counter value + int counterValue = db.getSyncCounterValue(); + + // do the sync (which would normally happen via the db's internal executor) + db.doSync(); + + // assert counter value incremented + assertEquals(counterValue + 1, db.getSyncCounterValue()); + } + } + + @Test(timeout = 10_000) + public void testWaitForSync() throws Exception { + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync + .build()) { + db.initialize(); + + Future future = executor.submit(() -> { + db.waitForSync(); + return true; + }); + + // the future should still be blocked waiting for sync to happen + assertFalse(future.isDone()); + + // give the future time to wake up and complete + while (!future.isDone()) { + // TESTING NOTE: this is inside a loop to address a minor *testing* race condition where our first doSync() could happen before the future runs, + // meaning waitForSync() would be left waiting on another doSync() that never comes... + + // do the sync (which would normally happen via the db's internal executor) + db.doSync(); + Thread.sleep(25); + } + + // the future should no longer be blocked + assertTrue(future.isDone()); + } + } + + @Test(timeout = 10_000) + public void testWaitForSyncWithValue() throws Exception { + try (RocksDBMetronome db = new RocksDBMetronome.Builder() + .setStoragePath(temporaryFolder.newFolder().toPath()) + .setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync + .build()) { + db.initialize(); + + int syncCounterValue = db.getSyncCounterValue(); + + // "wait" for one before current counter value... should not block + db.waitForSync(syncCounterValue - 1); + + // wait for current value... should block (because auto-sync isn't happening) + assertBlocks(db, syncCounterValue); + + // do the sync (which would normally happen via the db's internal executor) + db.doSync(); + + // "wait" for initial value... should now not block + db.waitForSync(syncCounterValue); + + // wait for current value again... should block (because auto-sync isn't happening) + assertBlocks(db, db.getSyncCounterValue()); + } + } + + private void assertBlocks(RocksDBMetronome db, int counterValue) throws InterruptedException, java.util.concurrent.ExecutionException { + Future future = getWaitForSyncFuture(db, counterValue); + + try { + future.get(1, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException expected) { + assertFalse(future.isDone()); + } + future.cancel(true); + } + + private Future getWaitForSyncFuture(RocksDBMetronome db, int counterValue) { + return executor.submit(() -> { + db.waitForSync(counterValue); + return true; + }); + } +} \ No newline at end of file diff --git a/nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties b/nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties new file mode 100644 index 0000000000..29dd873ef8 --- /dev/null +++ b/nifi-commons/nifi-rocksdb-utils/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO,console +log4j.category.org.apache.nifi=DEBUG + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n \ No newline at end of file diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml index fec6996a64..d7e8ec3d1a 100644 --- a/nifi-commons/pom.xml +++ b/nifi-commons/pom.xml @@ -12,7 +12,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> +--> + 4.0.0 org.apache.nifi @@ -24,22 +26,23 @@ pom nifi-data-provenance-utils - nifi-flowfile-packager nifi-expression-language - nifi-logging-utils - nifi-properties - nifi-security-utils - nifi-socket-utils - nifi-utils - nifi-json-utils - nifi-web-utils - nifi-write-ahead-log - nifi-site-to-site-client + nifi-flowfile-packager nifi-hl7-query-language - nifi-schema-utils - nifi-record - nifi-record-path + nifi-json-utils + nifi-logging-utils nifi-metrics nifi-parameter + nifi-properties + nifi-record + nifi-record-path + nifi-rocksdb-utils + nifi-schema-utils + nifi-security-utils + nifi-site-to-site-client + nifi-socket-utils + nifi-utils + nifi-web-utils + nifi-write-ahead-log diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 4ec4352b9c..514d791818 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -143,6 +143,11 @@ nifi-write-ahead-log 1.10.0-SNAPSHOT + + org.apache.nifi + nifi-rocksdb-utils + 1.10.0-SNAPSHOT + org.apache.nifi nifi-flowfile-repo-serialization diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java new file mode 100644 index 0000000000..4f3dd45de5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java @@ -0,0 +1,1256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.rocksdb.RocksDBMetronome; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.UpdateType; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +/** + *

+ * Implements FlowFile Repository using RocksDB as the backing store. + *

+ */ +public class RocksDBFlowFileRepository implements FlowFileRepository { + + private static final Logger logger = LoggerFactory.getLogger(RocksDBFlowFileRepository.class); + + private static final String FLOWFILE_PROPERTY_PREFIX = "nifi.flowfile.repository."; + private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = FLOWFILE_PROPERTY_PREFIX + "directory"; + + private static final byte[] SWAP_LOCATION_SUFFIX_KEY = "swap.location.sufixes".getBytes(StandardCharsets.UTF_8); + private static final byte[] SERIALIZATION_ENCODING_KEY = "serial.encoding".getBytes(StandardCharsets.UTF_8); + private static final byte[] SERIALIZATION_HEADER_KEY = "serial.header".getBytes(StandardCharsets.UTF_8); + static final byte[] REPOSITORY_VERSION_KEY = "repository.version".getBytes(StandardCharsets.UTF_8); + static final byte[] VERSION_ONE_BYTES = "1.0".getBytes(StandardCharsets.UTF_8); + private static final IllegalStateException NO_NEW_FLOWFILES = new IllegalStateException("Repository is not currently accepting new FlowFiles"); + private static final Runtime runtime = Runtime.getRuntime(); + private static final NumberFormat percentFormat = NumberFormat.getPercentInstance(); + + /** + * Each property is defined by its name in the file and its default value + */ + enum RocksDbProperty { + //FlowFileRepo Configuration Parameters + SYNC_WARNING_PERIOD("rocksdb.sync.warning.period", "30 seconds"), + CLAIM_CLEANUP_PERIOD("rocksdb.claim.cleanup.period", "30 seconds"), + DESERIALIZATION_THREADS("rocksdb.deserialization.threads", "16"), + DESERIALIZATION_BUFFER_SIZE("rocksdb.deserialization.buffer.size", "1000"), + SYNC_PERIOD("rocksdb.sync.period", "10 milliseconds"), + ACCEPT_DATA_LOSS("rocksdb.accept.data.loss", "false"), + ENABLE_STALL_STOP("rocksdb.enable.stall.stop", "false"), + STALL_PERIOD("rocksdb.stall.period", "100 milliseconds"), + STALL_FLOWFILE_COUNT("rocksdb.stall.flowfile.count", "800000"), + STALL_HEAP_USAGE_PERCENT("rocksdb.stall.heap.usage.percent", "95%"), + STOP_FLOWFILE_COUNT("rocksdb.stop.flowfile.count", "1100000"), + STOP_HEAP_USAGE_PERCENT("rocksdb.stop.heap.usage.percent", "99.9%"), + REMOVE_ORPHANED_FLOWFILES("rocksdb.remove.orphaned.flowfiles.on.startup", "false"), + ENABLE_RECOVERY_MODE("rocksdb.enable.recovery.mode", "false"), + RECOVERY_MODE_FLOWFILE_LIMIT("rocksdb.recovery.mode.flowfile.count", "5000"), + + //RocksDB Configuration Parameters + DB_PARALLEL_THREADS("rocksdb.parallel.threads", "8"), + MAX_WRITE_BUFFER_NUMBER("rocksdb.max.write.buffer.number", "4"), + WRITE_BUFFER_SIZE("rocksdb.write.buffer.size", "256 MB"), + LEVEL_O_SLOWDOWN_WRITES_TRIGGER("rocksdb.level.0.slowdown.writes.trigger", "20"), + LEVEL_O_STOP_WRITES_TRIGGER("rocksdb.level.0.stop.writes.trigger", "40"), + DELAYED_WRITE_RATE("rocksdb.delayed.write.bytes.per.second", "16 MB"), + MAX_BACKGROUND_FLUSHES("rocksdb.max.background.flushes", "1"), + MAX_BACKGROUND_COMPACTIONS("rocksdb.max.background.compactions", "1"), + MIN_WRITE_BUFFER_NUMBER_TO_MERGE("rocksdb.min.write.buffer.number.to.merge", "1"), + STAT_DUMP_PERIOD("rocksdb.stat.dump.period", "600 sec"), + ; + + final String propertyName; + final String defaultValue; + + RocksDbProperty(String propertyName, String defaultValue) { + this.propertyName = FLOWFILE_PROPERTY_PREFIX + propertyName; + this.defaultValue = defaultValue; + } + + /** + * @param niFiProperties The Properties file + * @param timeUnit The desired time unit + * @return The property Value in the desired units + */ + long getTimeValue(NiFiProperties niFiProperties, TimeUnit timeUnit) { + String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue); + long timeValue = 0L; + try { + timeValue = Math.round(FormatUtils.getPreciseTimeDuration(propertyValue, timeUnit)); + } catch (IllegalArgumentException e) { + this.generateIllegalArgumentException(propertyValue, e); + } + return timeValue; + } + + /** + * @param niFiProperties The Properties file + * @return The property value as a boolean + */ + boolean getBooleanValue(NiFiProperties niFiProperties) { + String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue); + return Boolean.parseBoolean(propertyValue); + } + + /** + * @param niFiProperties The Properties file + * @return The property value as an int + */ + int getIntValue(NiFiProperties niFiProperties) { + String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue); + int returnValue = 0; + try { + returnValue = Integer.parseInt(propertyValue); + } catch (NumberFormatException e) { + this.generateIllegalArgumentException(propertyValue, e); + } + return returnValue; + } + + /** + * @param niFiProperties The Properties file + * @return The property value as a number of bytes + */ + long getByteCountValue(NiFiProperties niFiProperties) { + long returnValue = 0L; + String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue); + try { + double writeBufferDouble = DataUnit.parseDataSize(propertyValue, DataUnit.B); + returnValue = (long) (writeBufferDouble < Long.MAX_VALUE ? writeBufferDouble : Long.MAX_VALUE); + } catch (IllegalArgumentException e) { + this.generateIllegalArgumentException(propertyValue, e); + } + return returnValue; + } + + /** + * @param niFiProperties The Properties file + * @return The property value as a percent + */ + double getPercentValue(NiFiProperties niFiProperties) { + String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue).replace('%', ' '); + double returnValue = 0.0D; + try { + returnValue = Double.parseDouble(propertyValue) / 100D; + if (returnValue > 1.0D) { + this.generateIllegalArgumentException(propertyValue, null); + } + } catch (NumberFormatException e) { + this.generateIllegalArgumentException(propertyValue, e); + } + return returnValue; + } + + /** + * @param niFiProperties The Properties file + * @return The property value as a long + */ + long getLongValue(NiFiProperties niFiProperties) { + String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue); + long returnValue = 0L; + try { + returnValue = Long.parseLong(propertyValue); + } catch (NumberFormatException e) { + this.generateIllegalArgumentException(propertyValue, e); + } + return returnValue; + } + + void generateIllegalArgumentException(String badValue, Throwable t) { + throw new IllegalArgumentException("The NiFi Property: [" + this.propertyName + "] with value: [" + badValue + "] is not valid", t); + } + + } + + + private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L); + private final int deserializationThreads; + private final int deserializationBufferSize; + private final long claimCleanupMillis; + private final ScheduledExecutorService housekeepingExecutor; + private final AtomicReference> claimsAwaitingDestruction = new AtomicReference<>(new ArrayList<>()); + private final RocksDBMetronome db; + private ResourceClaimManager claimManager; + private RepositoryRecordSerdeFactory serdeFactory; + private SerDe serializer; + private String serializationEncodingName; + private byte[] serializationHeader; + + private final boolean acceptDataLoss; + private final boolean enableStallStop; + private final boolean removeOrphanedFlowFiles; + private final boolean enableRecoveryMode; + private final long recoveryModeFlowFileLimit; + private final AtomicReference> recordDeserializer = new AtomicReference<>(); + private final List recordsToRestore = Collections.synchronizedList(new LinkedList<>()); + + private final ReentrantLock stallStopLock = new ReentrantLock(); + private final AtomicLong inMemoryFlowFiles = new AtomicLong(0L); + volatile boolean stallNewFlowFiles = false; + volatile boolean stopNewFlowFiles = false; + private final long stallMillis; + private final long stallCount; + private final long stopCount; + private final double stallPercentage; + private final double stopPercentage; + + private final Set swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself + + /** + * default no args constructor for service loading only. + */ + public RocksDBFlowFileRepository() { + deserializationThreads = 0; + deserializationBufferSize = 0; + claimCleanupMillis = 0; + housekeepingExecutor = null; + db = null; + acceptDataLoss = false; + enableStallStop = false; + removeOrphanedFlowFiles = false; + stallMillis = 0; + stallCount = 0; + stopCount = 0; + stallPercentage = 0; + stopPercentage = 0; + enableRecoveryMode = false; + recoveryModeFlowFileLimit = 0; + } + + public RocksDBFlowFileRepository(final NiFiProperties niFiProperties) { + deserializationThreads = RocksDbProperty.DESERIALIZATION_THREADS.getIntValue(niFiProperties); + deserializationBufferSize = RocksDbProperty.DESERIALIZATION_BUFFER_SIZE.getIntValue(niFiProperties); + + claimCleanupMillis = RocksDbProperty.CLAIM_CLEANUP_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS); + housekeepingExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setDaemon(true); + return thread; + }); + + acceptDataLoss = RocksDbProperty.ACCEPT_DATA_LOSS.getBooleanValue(niFiProperties); + enableStallStop = RocksDbProperty.ENABLE_STALL_STOP.getBooleanValue(niFiProperties); + + removeOrphanedFlowFiles = RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.getBooleanValue(niFiProperties); + if (removeOrphanedFlowFiles) { + logger.warn("The property \"{}\" is currently set to \"true\". " + + "This can potentially lead to data loss, and should only be set if you are absolutely certain it is necessary. " + + "Even then, it should be removed as soon as possible.", + RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName); + } + stallMillis = RocksDbProperty.STALL_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS); + stallCount = RocksDbProperty.STALL_FLOWFILE_COUNT.getLongValue(niFiProperties); + stopCount = RocksDbProperty.STOP_FLOWFILE_COUNT.getLongValue(niFiProperties); + stallPercentage = RocksDbProperty.STALL_HEAP_USAGE_PERCENT.getPercentValue(niFiProperties); + stopPercentage = RocksDbProperty.STOP_HEAP_USAGE_PERCENT.getPercentValue(niFiProperties); + + enableRecoveryMode = RocksDbProperty.ENABLE_RECOVERY_MODE.getBooleanValue(niFiProperties); + recoveryModeFlowFileLimit = RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.getLongValue(niFiProperties); + if (enableRecoveryMode) { + logger.warn("The property \"{}\" is currently set to \"true\" and \"{}\" is set to \"{}\". " + + "This means that only {} FlowFiles will be loaded in to memory from the FlowFile repo at a time, " + + "allowing for recovery of a system encountering OutOfMemory errors (or similar). " + + "This setting should be reset to \"false\" as soon as recovery is complete.", + RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, recoveryModeFlowFileLimit, recoveryModeFlowFileLimit); + } + db = new RocksDBMetronome.Builder() + .setStatDumpSeconds((int) (Math.min(RocksDbProperty.STAT_DUMP_PERIOD.getTimeValue(niFiProperties, TimeUnit.SECONDS), Integer.MAX_VALUE))) + .setParallelThreads(RocksDbProperty.DB_PARALLEL_THREADS.getIntValue(niFiProperties)) + .setMaxWriteBufferNumber(RocksDbProperty.MAX_WRITE_BUFFER_NUMBER.getIntValue(niFiProperties)) + .setMinWriteBufferNumberToMerge(RocksDbProperty.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.getIntValue(niFiProperties)) + .setWriteBufferSize(RocksDbProperty.WRITE_BUFFER_SIZE.getByteCountValue(niFiProperties)) + .setDelayedWriteRate(RocksDbProperty.DELAYED_WRITE_RATE.getByteCountValue(niFiProperties)) + .setLevel0SlowdownWritesTrigger(RocksDbProperty.LEVEL_O_SLOWDOWN_WRITES_TRIGGER.getIntValue(niFiProperties)) + .setLevel0StopWritesTrigger(RocksDbProperty.LEVEL_O_STOP_WRITES_TRIGGER.getIntValue(niFiProperties)) + .setMaxBackgroundFlushes(RocksDbProperty.MAX_BACKGROUND_FLUSHES.getIntValue(niFiProperties)) + .setMaxBackgroundCompactions(RocksDbProperty.MAX_BACKGROUND_COMPACTIONS.getIntValue(niFiProperties)) + .setSyncMillis(RocksDbProperty.SYNC_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS)) + .setSyncWarningNanos(RocksDbProperty.SYNC_WARNING_PERIOD.getTimeValue(niFiProperties, TimeUnit.NANOSECONDS)) + .setStoragePath(getFlowFileRepoPath(niFiProperties)) + .setAdviseRandomOnOpen(false) + .setCreateMissingColumnFamilies(true) + .setCreateIfMissing(true) + .setPeriodicSyncEnabled(!acceptDataLoss) + .build(); + } + + /** + * @param niFiProperties The Properties file + * @return The path of the repo + */ + static Path getFlowFileRepoPath(NiFiProperties niFiProperties) { + for (final String propertyName : niFiProperties.getPropertyKeys()) { + if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) { + final String dirName = niFiProperties.getProperty(propertyName); + return Paths.get(dirName); + } + } + return null; + } + + /** + * @return The name of the File Store + */ + @Override + public String getFileStoreName() { + try { + return Files.getFileStore(db.getStoragePath()).name(); + } catch (IOException e) { + return null; + } + } + + @Override + public void initialize(final ResourceClaimManager claimManager) throws IOException { + this.db.initialize(); + this.claimManager = claimManager; + this.serdeFactory = new StandardRepositoryRecordSerdeFactory(claimManager); + + try { + byte[] versionBytes = db.getConfiguration(REPOSITORY_VERSION_KEY); + if (versionBytes == null) { + db.putConfiguration(REPOSITORY_VERSION_KEY, VERSION_ONE_BYTES); + } else if (!Arrays.equals(versionBytes, VERSION_ONE_BYTES)) { + throw new IllegalStateException("Unknown repository version: " + new String(versionBytes, StandardCharsets.UTF_8)); + } + + byte[] serializationEncodingBytes = db.getConfiguration(SERIALIZATION_ENCODING_KEY); + + if (serializationEncodingBytes == null) { + serializer = serdeFactory.createSerDe(null); + serializationEncodingName = serializer.getClass().getName(); + db.putConfiguration(SERIALIZATION_ENCODING_KEY, serializationEncodingName.getBytes(StandardCharsets.UTF_8)); + } else { + serializationEncodingName = new String(serializationEncodingBytes, StandardCharsets.UTF_8); + serializer = serdeFactory.createSerDe(serializationEncodingName); + } + + serializationHeader = db.getConfiguration(SERIALIZATION_HEADER_KEY); + + if (serializationHeader == null) { + try ( + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream) + ) { + serializer.writeHeader(dataOutputStream); + serializationHeader = byteArrayOutputStream.toByteArray(); + db.putConfiguration(SERIALIZATION_HEADER_KEY, serializationHeader); + } + } + + + byte[] swapLocationSuffixBytes = db.getConfiguration(SWAP_LOCATION_SUFFIX_KEY); + if (swapLocationSuffixBytes != null) { + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(swapLocationSuffixBytes); + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) { + Object o = objectInputStream.readObject(); + if (o instanceof Collection) { + ((Collection) o).forEach(obj -> swapLocationSuffixes.add(obj.toString())); + } + } + } + } catch (RocksDBException | ClassNotFoundException e) { + throw new IOException(e); + } + + housekeepingExecutor.scheduleWithFixedDelay(this::doHousekeeping, 0, claimCleanupMillis, TimeUnit.MILLISECONDS); + + logger.info("Initialized FlowFile Repository at {}", db.getStoragePath()); + } + + @Override + public void close() throws IOException { + if (housekeepingExecutor != null) { + housekeepingExecutor.shutdownNow(); + } + + if (db != null) { + db.close(); + } + } + + /** + * This method is scheduled by the housekeepingExecutor at the specified interval. + * Catches Throwable so as not to suppress future executions of the ScheduledExecutorService + */ + private void doHousekeeping() { + try { + doClaimCleanup(); + updateStallStop(); + doRecovery(); + } catch (Throwable t) { + // catching Throwable so as not to suppress subsequent executions + logger.error("Encountered problem during housekeeping", t); + } + } + + /** + * Marks as destructible any claims that were held by records which have now been deleted. + */ + private void doClaimCleanup() { + Collection claimsToDestroy; + synchronized (claimsAwaitingDestruction) { + if (Thread.currentThread().isInterrupted()) { + return; + } + claimsToDestroy = claimsAwaitingDestruction.getAndSet(new ArrayList<>()); + } + if (claimsToDestroy != null) { + Collection uniqueClaimsToDestroy = new HashSet<>(claimsToDestroy); + + try { + if (!acceptDataLoss) { + db.waitForSync(); + } else { + db.forceSync(); + } + } catch (InterruptedException | RocksDBException e) { + synchronized (claimsAwaitingDestruction) { + // if there was an exception, put back the claims we were attempting to destroy + claimsAwaitingDestruction.get().addAll(uniqueClaimsToDestroy); + return; + } + } + for (final ResourceClaim claim : uniqueClaimsToDestroy) { + claimManager.markDestructable(claim); + } + } + } + + /** + * Updates the stalled and stopped status of the repository + */ + void updateStallStop() { + // if stall.stop logic is not enabled, return + if (!enableStallStop) return; + + if (stallStopLock.tryLock()) { + try { + final long inMemoryFlowFiles = getInMemoryFlowFiles(); + + if (inMemoryFlowFiles >= stopCount) { + stopNewFlowFiles = true; + stallNewFlowFiles = true; + logger.warn("Halting new FlowFiles because maximum FlowFile count ({}) has been exceeded. Current count: {}", + new Object[]{stopCount, inMemoryFlowFiles}); + return; + } + + // calculate usage percentage + final double freeMemory = runtime.freeMemory(); + final double maxMemory = runtime.maxMemory(); + final double usedPercentage = 1.0d - (freeMemory / maxMemory); + + if (usedPercentage >= stopPercentage) { + stopNewFlowFiles = true; + stallNewFlowFiles = true; + logger.warn("Halting new FlowFiles because maximum heap usage percentage ({}) has been exceeded. Current usage: {}", + new Object[]{percentFormat.format(stopPercentage), percentFormat.format(usedPercentage)}); + return; + } + + if (inMemoryFlowFiles >= stallCount) { + stopNewFlowFiles = false; + stallNewFlowFiles = true; + logger.warn("Stalling new FlowFiles because FlowFile count stall threshold ({}) has been exceeded. Current count: {}", + new Object[]{stallCount, inMemoryFlowFiles}); + return; + } + + if (usedPercentage >= stallPercentage) { + stopNewFlowFiles = false; + stallNewFlowFiles = true; + logger.warn("Stalling new FlowFiles because heap usage percentage threshold ({}) has been exceeded. Current count: {}", + new Object[]{percentFormat.format(stallPercentage), percentFormat.format(usedPercentage)}); + return; + } + + if (stopNewFlowFiles || stallNewFlowFiles) { + logger.info("Resuming acceptance of new FlowFiles"); + stopNewFlowFiles = false; + stallNewFlowFiles = false; + } + } finally { + stallStopLock.unlock(); + } + } + } + + /** + * If in recovery mode, restore more FlowFile if under the configured limit + */ + synchronized void doRecovery() { + + // if we are not in recovery mode, return + if (!enableRecoveryMode) return; + + SerDe deserializer = recordDeserializer.get(); + if (deserializer == null) { + return; // initial load hasn't completed + } + + if (recordsToRestore.isEmpty()) { + logger.warn("Recovery has been completed. " + + "The property \"{}\" is currently set to \"true\", but should be reset to \"false\" as soon as possible.", + RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName); + return; + } + + logger.warn("The property \"{}\" is currently set to \"true\" and \"{}\" is set to \"{}\". " + + "This means that only {} FlowFiles will be loaded into memory from the FlowFile repo at a time, " + + "allowing for recovery of a system encountering OutOfMemory errors (or similar). " + + "This setting should be reset to \"false\" as soon as recovery is complete. " + + "There are {} records remaining to be recovered.", + RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, + recoveryModeFlowFileLimit, recoveryModeFlowFileLimit, getRecordsToRestoreCount()); + + while (!recordsToRestore.isEmpty() && inMemoryFlowFiles.get() < recoveryModeFlowFileLimit) { + try { + byte[] key = recordsToRestore.get(0); + byte[] recordBytes = db.get(key); + if (recordBytes != null) { + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(recordBytes); + DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { + RepositoryRecord record = deserializer.deserializeRecord(dataInputStream, deserializer.getVersion()); + final FlowFileRecord flowFile = record.getCurrent(); + final FlowFileQueue queue = record.getOriginalQueue(); + if (queue != null) { + queue.put(flowFile); + inMemoryFlowFiles.incrementAndGet(); + } + } + } + recordsToRestore.remove(0); + } catch (IOException | RocksDBException e) { + logger.warn("Encountered exception during recovery", e); + } + } + } + + long getInMemoryFlowFiles() { + return inMemoryFlowFiles.get(); + } + + long getRecordsToRestoreCount() { + return recordsToRestore.size(); + } + + /** + * Updates the FlowFile repository with the given RepositoryRecords + * + * @param records the records to update the repository with + * @throws IOException if update fails or a required sync is interrupted + */ + @Override + public void updateRepository(final Collection records) throws IOException { + + // verify records are valid + int netIncrease = countAndValidateRecords(records); + + final boolean causeIncrease = netIncrease > 0; + if (causeIncrease && stopNewFlowFiles) { + updateStallStop(); + throw NO_NEW_FLOWFILES; + } + + //update the db with the new records + int syncCounterValue = updateRocksDB(records); + inMemoryFlowFiles.addAndGet(netIncrease); + + try { + // if we created data, but are at a threshold, delay and allow other data to try to get through + if (causeIncrease && (stallNewFlowFiles || stopNewFlowFiles)) { + Thread.sleep(stallMillis); + updateStallStop(); + } + + // if we got a record indicating data creation, wait for it to be synced to disk before proceeding + if (!acceptDataLoss && syncCounterValue > 0) { + db.waitForSync(syncCounterValue); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + + // determine any content claims that can be destroyed + determineDestructibleClaims(records); + } + + /** + * Check that each record has the required elements + * + * @param records to be validated + * @return the change in in-memory FlowFile count represented by this Collection + */ + private int countAndValidateRecords(Collection records) { + int inMemoryDelta = 0; + for (RepositoryRecord record : records) { + validateRecord(record); + if (record.getType() == RepositoryRecordType.CREATE || record.getType() == RepositoryRecordType.SWAP_IN) { + inMemoryDelta++; + } else if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.SWAP_OUT) { + inMemoryDelta--; + } + } + return inMemoryDelta; + } + + /** + * Check that a record has the required elements + * + * @param record to be validated + */ + private void validateRecord(RepositoryRecord record) { + if (record.getType() != RepositoryRecordType.DELETE + && record.getType() != RepositoryRecordType.CONTENTMISSING + && record.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS + && record.getType() != RepositoryRecordType.SWAP_OUT + && record.getDestination() == null) { + throw new IllegalArgumentException("Record " + record + " has no destination and Type is " + record.getType()); + } + } + + /** + * Update the records in the RocksDB database + * + * @param records to be persisted + * @return the value of the sync counter immediately after the last update which requires a sync + */ + private int updateRocksDB(Collection records) throws IOException { + + // Partition records by UpdateType. + // We do this because we want to ensure that records creating data are persisted first. + // Additionally, remove records of type 'CLEANUP_TRANSIENT_CLAIMS' so they aren't sent to the db + + final Map> partitionedRecords = new HashMap<>(); + for (RepositoryRecord repositoryRecord : records) { + if (repositoryRecord.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS) { + continue; + } + final UpdateType updateType = serdeFactory.getUpdateType(repositoryRecord); + partitionedRecords.computeIfAbsent(updateType, ut -> new ArrayList<>()).add(repositoryRecord); + } + + int counterValue; + + try { + // handle CREATE records + putAll(partitionedRecords.get(UpdateType.CREATE)); + + // handle SWAP_OUT records + List swapOutRecords = partitionedRecords.get(UpdateType.SWAP_OUT); + if (swapOutRecords != null) { + for (final RepositoryRecord record : swapOutRecords) { + final String newLocation = serdeFactory.getLocation(record); + if (newLocation == null) { + final Long recordIdentifier = serdeFactory.getRecordIdentifier(record); + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); + } else { + delete(record); + } + } + } + + // handle SWAP_IN records + List swapInRecords = partitionedRecords.get(UpdateType.SWAP_IN); + if (swapInRecords != null) { + for (final RepositoryRecord record : swapInRecords) { + final String newLocation = serdeFactory.getLocation(record); + if (newLocation == null) { + final Long recordIdentifier = serdeFactory.getRecordIdentifier(record); + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but " + + "no indicator of where the Record is to be Swapped In from; these records may be " + + "duplicated when the repository is restored!"); + } + put(record); + } + } + + // if a sync is required, get the current value of the sync counter + counterValue = syncRequired(partitionedRecords) ? db.getSyncCounterValue() : -1; + + // handle UPDATE records + putAll(partitionedRecords.get(UpdateType.UPDATE)); + + // handle DELETE records + deleteAll(partitionedRecords.get(UpdateType.DELETE)); + + } catch (RocksDBException e) { + throw new IOException(e); + } + + return counterValue; + } + + private boolean syncRequired(Map> recordMap) { + for (UpdateType updateType : recordMap.keySet()) { + if (updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_OUT || updateType == UpdateType.SWAP_IN) { + return true; + } + } + return false; + } + + private void deleteAll(List repositoryRecords) throws RocksDBException { + if (repositoryRecords != null) { + for (final RepositoryRecord record : repositoryRecords) { + delete(record); + } + } + } + + private void delete(RepositoryRecord record) throws RocksDBException { + final Long recordIdentifier = serdeFactory.getRecordIdentifier(record); + byte[] key = RocksDBMetronome.getBytes(recordIdentifier); + db.delete(key); + } + + private void putAll(List repositoryRecords) throws IOException, RocksDBException { + if (repositoryRecords != null) { + for (final RepositoryRecord record : repositoryRecords) { + put(record); + } + } + } + + private void put(RepositoryRecord record) throws IOException, RocksDBException { + final Long recordIdentifier = serdeFactory.getRecordIdentifier(record); + byte[] key = RocksDBMetronome.getBytes(recordIdentifier); + final byte[] serializedRecord = serialize(record); + db.put(key, serializedRecord); + } + + private byte[] serialize(RepositoryRecord record) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + serializer.serializeRecord(record, dataOutputStream); + return byteArrayOutputStream.toByteArray(); + } + } + + private void determineDestructibleClaims(Collection records) throws IOException { + + final Set claimsToAdd = new HashSet<>(); + final Set swapLocationsAdded = new HashSet<>(); + final Set swapLocationsRemoved = new HashSet<>(); + for (final RepositoryRecord record : records) { + updateClaimCounts(record); + + if (record.getType() == RepositoryRecordType.DELETE) { + // For any DELETE record that we have, if claim is destructible, mark it so + if (isDestructible(record.getCurrentClaim())) { + claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); + } + + // If the original claim is different than the current claim and the original claim is destructible, mark it so + if (shouldDestroyOriginal(record)) { + claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); + } + } else if (record.getType() == RepositoryRecordType.UPDATE) { + // if we have an update, and the original is no longer needed, mark original as destructible + if (shouldDestroyOriginal(record)) { + claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); + } + } else if (record.getType() == RepositoryRecordType.SWAP_OUT) { + final String swapLocation = record.getSwapLocation(); + final String normalizedSwapLocation = normalizeSwapLocation(swapLocation); + swapLocationsAdded.add(normalizedSwapLocation); + swapLocationsRemoved.remove(normalizedSwapLocation); + } else if (record.getType() == RepositoryRecordType.SWAP_IN) { + final String swapLocation = record.getSwapLocation(); + final String normalizedSwapLocation = normalizeSwapLocation(swapLocation); + swapLocationsRemoved.add(normalizedSwapLocation); + swapLocationsAdded.remove(normalizedSwapLocation); + } + + final List transientClaims = record.getTransientClaims(); + if (transientClaims != null) { + for (final ContentClaim transientClaim : transientClaims) { + if (isDestructible(transientClaim)) { + claimsToAdd.add(transientClaim.getResourceClaim()); + } + } + } + } + + // If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes. + if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) { + synchronized (swapLocationSuffixes) { + removeNormalizedSwapLocations(swapLocationsRemoved); + addNormalizedSwapLocations(swapLocationsAdded); + } + } + + // add any new claims that can be destroyed to the list + if (!claimsToAdd.isEmpty()) { + synchronized (claimsAwaitingDestruction) { + claimsAwaitingDestruction.get().addAll(claimsToAdd); + } + } + } + + + private void updateClaimCounts(final RepositoryRecord record) { + final ContentClaim currentClaim = record.getCurrentClaim(); + final ContentClaim originalClaim = record.getOriginalClaim(); + final boolean claimChanged = !Objects.equals(currentClaim, originalClaim); + + if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) { + decrementClaimCount(currentClaim); + } + + if (claimChanged) { + // records which have been updated - remove original if exists + decrementClaimCount(originalClaim); + } + } + + private void decrementClaimCount(final ContentClaim claim) { + if (claim == null) { + return; + } + + claimManager.decrementClaimantCount(claim.getResourceClaim()); + } + + + /** + * @param claim to be evaluated + * @return true if the claim can be destroyed + */ + private boolean isDestructible(final ContentClaim claim) { + if (claim == null) { + return false; + } + + final ResourceClaim resourceClaim = claim.getResourceClaim(); + if (resourceClaim == null) { + return false; + } + + return !resourceClaim.isInUse(); + } + + /** + * @param record to be evaluated + * @return true if the original claim can be destroyed + */ + private boolean shouldDestroyOriginal(RepositoryRecord record) { + final ContentClaim originalClaim = record.getOriginalClaim(); + return isDestructible(originalClaim) && !originalClaim.equals(record.getCurrentClaim()); + } + + @Override + public boolean isVolatile() { + return false; + } + + @Override + public long getStorageCapacity() throws IOException { + return db.getStorageCapacity(); + } + + @Override + public long getUsableStorageSpace() throws IOException { + return db.getUsableStorageSpace(); + } + + + @Override + public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) { + String normalizedSwapLocation = normalizeSwapLocation(swapLocationSuffix); + synchronized (swapLocationSuffixes) { + return swapLocationSuffixes.contains(normalizedSwapLocation); + } + } + + + static String normalizeSwapLocation(final String swapLocation) { + if (swapLocation == null) { + return null; + } + + final String normalizedPath = swapLocation.replace("\\", "/"); + final String withoutTrailing = (normalizedPath.endsWith("/") && normalizedPath.length() > 1) ? normalizedPath.substring(0, normalizedPath.length() - 1) : normalizedPath; + final String pathRemoved = getLocationSuffix(withoutTrailing); + + return StringUtils.substringBefore(pathRemoved, "."); + } + + private static String getLocationSuffix(final String swapLocation) { + final int lastIndex = swapLocation.lastIndexOf("/"); + if (lastIndex < 0 || lastIndex >= swapLocation.length() - 1) { + return swapLocation; + } + + return swapLocation.substring(lastIndex + 1); + } + + @Override + public void swapFlowFilesOut(final List swappedOut, final FlowFileQueue queue, + final String swapLocation) throws IOException { + final List repoRecords = new ArrayList<>(); + if (swappedOut == null || swappedOut.isEmpty()) { + return; + } + + for (final FlowFileRecord swapRecord : swappedOut) { + final RepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord, swapLocation); + repoRecords.add(repoRecord); + } + + updateRepository(repoRecords); + addRawSwapLocation(swapLocation); + logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", swappedOut.size(), queue, swapLocation); + } + + @Override + public void swapFlowFilesIn(final String swapLocation, final List swapRecords, + final FlowFileQueue queue) throws IOException { + final List repoRecords = new ArrayList<>(); + + for (final FlowFileRecord swapRecord : swapRecords) { + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord); + repoRecord.setSwapLocation(swapLocation); // set the swap file to indicate that it's being swapped in. + repoRecord.setDestination(queue); + + repoRecords.add(repoRecord); + } + + updateRepository(repoRecords); + removeRawSwapLocation(swapLocation); + logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); + } + + + @Override + public long loadFlowFiles(final QueueProvider queueProvider) throws IOException { + + final long startTime = System.nanoTime(); + + final Map queueMap = new HashMap<>(); + for (final FlowFileQueue queue : queueProvider.getAllQueues()) { + queueMap.put(queue.getIdentifier(), queue); + } + + final ExecutorService recordDeserializationExecutor = Executors.newFixedThreadPool(deserializationThreads, r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setDaemon(true); + return thread; + }); + + // Create a queue which will hold the bytes of the records that have been read from disk and are awaiting deserialization + final BlockingQueue recordBytesQueue = new ArrayBlockingQueue<>(deserializationBufferSize); + + final AtomicBoolean doneReading = new AtomicBoolean(false); + final List> futures = new ArrayList<>(deserializationThreads); + + RepositoryRecordSerdeFactory factory = new StandardRepositoryRecordSerdeFactory(claimManager); + factory.setQueueMap(queueMap); + + final AtomicInteger numFlowFilesMissingQueue = new AtomicInteger(0); + final AtomicInteger recordCount = new AtomicInteger(0); + final AtomicInteger recoveryModeRecordCount = new AtomicInteger(0); + + for (int i = 0; i < deserializationThreads; i++) { + futures.add(recordDeserializationExecutor.submit(() -> { + long localMaxId = 0; + int localRecordCount = 0; + final Set localRecoveredSwapLocations = new HashSet<>(); + + // Create deserializer in each thread + factory.setQueueMap(queueMap); + final SerDe localDeserializer = factory.createSerDe(serializationEncodingName); + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializationHeader); + DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { + localDeserializer.readHeader(dataInputStream); + } + + while (!doneReading.get() || !recordBytesQueue.isEmpty()) { + byte[] value = recordBytesQueue.poll(100, TimeUnit.MILLISECONDS); + if (value != null) { + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(value); + DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { + RepositoryRecord record = localDeserializer.deserializeRecord(dataInputStream, localDeserializer.getVersion()); + + localRecordCount++; + + // increment the count for the record + final ContentClaim claim = record.getCurrentClaim(); + if (claim != null) { + claimManager.incrementClaimantCount(claim.getResourceClaim()); + } + + final long recordId = record.getCurrent().getId(); + if (recordId > localMaxId) { + localMaxId = recordId; + } + + if (record.getType().equals(RepositoryRecordType.SWAP_OUT)) { + localRecoveredSwapLocations.add(normalizeSwapLocation(record.getSwapLocation())); + } + + final FlowFileRecord flowFile = record.getCurrent(); + final FlowFileQueue queue = record.getOriginalQueue(); + if (queue == null) { + if (!removeOrphanedFlowFiles) { + throw new IOException("Found FlowFile in repository without a corresponding queue. " + + "This may indicate an issue syncing the flow.xml in a cluster. " + + "To resolve this issue you should restore the flow.xml. " + + "Alternatively, if removing data is acceptable, you can add the following to nifi.properties: \n\n" + + "\t\t" + RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName + "=true\n\n" + + "...once this has allowed you to restart nifi, you should remove it from nifi.properties to prevent inadvertent future data loss."); + } + + numFlowFilesMissingQueue.incrementAndGet(); + try { + final Long recordIdentifier = factory.getRecordIdentifier(record); + byte[] key = RocksDBMetronome.getBytes(recordIdentifier); + db.delete(key); + } catch (RocksDBException e) { + logger.warn("Could not clean up repository", e); + } + } else { + // verify we're supposed to enqueue the FlowFile + if (!enableRecoveryMode) { + queue.put(flowFile); + } else if (recoveryModeRecordCount.incrementAndGet() <= recoveryModeFlowFileLimit) { + queue.put(flowFile); + } else { + final Long recordIdentifier = factory.getRecordIdentifier(record); + byte[] key = RocksDBMetronome.getBytes(recordIdentifier); + recordsToRestore.add(key); + } + } + } + } + } + recordCount.addAndGet(localRecordCount); + addNormalizedSwapLocations(localRecoveredSwapLocations); + return localMaxId; + + })); + } + + long maxId = 0; + RocksIterator rocksIterator = db.getIterator(); + rocksIterator.seekToFirst(); + long counter = 0; + long totalRecords = 0; + try { + while (rocksIterator.isValid()) { + if (recordBytesQueue.offer(rocksIterator.value(), 10, TimeUnit.SECONDS)) { + rocksIterator.next(); + if (++counter == 5_000) { // periodically report progress + totalRecords += counter; + counter = 0; + logger.info("Read {} records from disk", totalRecords); + } + } else { + // couldn't add to the queue in a timely fashion... make sure there are no exceptions from the consumers + for (Future f : futures) { + // the only way it could be done at this point is through an exception + // (because we haven't yet set doneReading = true) + if (f.isDone()) { + f.get(); // this will throw the exception + } + } + logger.warn("Failed to add record bytes to queue. Will keep trying..."); + } + } + + doneReading.set(true); + totalRecords += counter; + logger.info("Finished reading from rocksDB. Read {} records from disk", totalRecords); + + for (Future f : futures) { + long futureMax = f.get(); // will wait for completion (or exception) + if (futureMax > maxId) { + maxId = futureMax; + } + } + + logger.info("Finished deserializing {} records", recordCount.get()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e); + } finally { + recordDeserializationExecutor.shutdownNow(); + } + + // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will + // return the appropriate number. + flowFileSequenceGenerator.set(maxId + 1); + + int flowFilesInQueues = recordCount.get() - numFlowFilesMissingQueue.get(); + inMemoryFlowFiles.set(!enableRecoveryMode ? flowFilesInQueues + : Math.min(flowFilesInQueues, recoveryModeFlowFileLimit)); + logger.info("Successfully restored {} FlowFiles in {} milliseconds using {} threads", + getInMemoryFlowFiles(), + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime), + deserializationThreads); + if (logger.isDebugEnabled()) { + synchronized (this.swapLocationSuffixes) { + logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes); + } + } + if (numFlowFilesMissingQueue.get() > 0) { + logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles have been dropped.", numFlowFilesMissingQueue); + } + + final SerDe deserializer = factory.createSerDe(serializationEncodingName); + factory.setQueueMap(null); // clear the map + + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializationHeader); + DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { + deserializer.readHeader(dataInputStream); + } + + if (enableRecoveryMode) { + recordDeserializer.set(deserializer); + } + + return maxId; + } + + + private void addRawSwapLocation(String rawSwapLocation) throws IOException { + addRawSwapLocations(Collections.singleton(rawSwapLocation)); + } + + private void addRawSwapLocations(Collection rawSwapLocations) throws IOException { + addNormalizedSwapLocations(rawSwapLocations.stream().map(RocksDBFlowFileRepository::normalizeSwapLocation).collect(Collectors.toSet())); + } + + private void addNormalizedSwapLocations(Collection normalizedSwapLocations) throws IOException { + synchronized (this.swapLocationSuffixes) { + this.swapLocationSuffixes.addAll(normalizedSwapLocations); + persistSwapLocationSuffixes(); + } + } + + private void removeRawSwapLocation(String rawSwapLocation) throws IOException { + removeRawSwapLocations(Collections.singleton(rawSwapLocation)); + } + + private void removeRawSwapLocations(Collection rawSwapLocations) throws IOException { + removeNormalizedSwapLocations(rawSwapLocations.stream().map(RocksDBFlowFileRepository::normalizeSwapLocation).collect(Collectors.toSet())); + } + + private void removeNormalizedSwapLocations(Collection normalizedSwapLocations) throws IOException { + synchronized (this.swapLocationSuffixes) { + this.swapLocationSuffixes.removeAll(normalizedSwapLocations); + persistSwapLocationSuffixes(); + } + } + + private void persistSwapLocationSuffixes() throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream) + ) { + objectOutputStream.writeObject(swapLocationSuffixes); + db.putConfiguration(SWAP_LOCATION_SUFFIX_KEY, byteArrayOutputStream.toByteArray()); + } catch (RocksDBException e) { + throw new IOException(e); + } + } + + + @Override + public void updateMaxFlowFileIdentifier(final long maxId) { + while (true) { + final long currentId = flowFileSequenceGenerator.get(); + if (currentId >= maxId) { + return; + } + + final boolean updated = flowFileSequenceGenerator.compareAndSet(currentId, maxId); + if (updated) { + return; + } + } + } + + @Override + public long getNextFlowFileSequence() { + return flowFileSequenceGenerator.getAndIncrement(); + } + + @Override + public long getMaxFlowFileIdentifier() { + // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong + return flowFileSequenceGenerator.get() - 1; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 133751179d..779c29c421 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -515,7 +515,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis updateRepository(repoRecords, true); synchronized (this.swapLocationSuffixes) { - this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation)); + this.swapLocationSuffixes.remove(normalizeSwapLocation(swapLocation)); } logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository index 590dbc1871..88a98c5d37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository @@ -1,16 +1,17 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.controller.repository.WriteAheadFlowFileRepository -org.apache.nifi.controller.repository.VolatileFlowFileRepository \ No newline at end of file +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.controller.repository.RocksDBFlowFileRepository +org.apache.nifi.controller.repository.VolatileFlowFileRepository +org.apache.nifi.controller.repository.WriteAheadFlowFileRepository diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java new file mode 100644 index 0000000000..f2cd84c320 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.NopConnectionEventListener; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.StandardFlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.swap.StandardSwapContents; +import org.apache.nifi.controller.swap.StandardSwapSummary; +import org.apache.nifi.rocksdb.RocksDBMetronome; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.file.FileUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +public class TestRocksDBFlowFileRepository { + + private static final Logger logger = LoggerFactory.getLogger(TestRocksDBFlowFileRepository.class); + + private final Map additionalProperties = new HashMap<>(); + private String nifiPropertiesPath; + + @Rule + public TestName testName = new TestName(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void before() throws IOException { + File testRepoDir = temporaryFolder.newFolder(testName.getMethodName()); + additionalProperties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, testRepoDir.getAbsolutePath()); + + File properties = temporaryFolder.newFile(); + Files.copy(Paths.get("src/test/resources/conf/nifi.properties"), properties.toPath(), StandardCopyOption.REPLACE_EXISTING); + nifiPropertiesPath = properties.getAbsolutePath(); + + logger.info("Running test: {}", testName.getMethodName()); + } + + @Test + public void testNormalizeSwapLocation() { + assertEquals("/", RocksDBFlowFileRepository.normalizeSwapLocation("/")); + assertEquals("", RocksDBFlowFileRepository.normalizeSwapLocation("")); + assertNull(RocksDBFlowFileRepository.normalizeSwapLocation(null)); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt")); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/test.txt")); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/tmp/test.txt")); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("//test.txt")); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt")); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt/")); + assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/")); + assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation(WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"))); + } + + @Test + public void testSwapLocationsRestored() throws IOException { + + final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties)); + repo.initialize(new StandardResourceClaimManager()); + + final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider(); + repo.loadFlowFiles(queueProvider); + + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + + final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class); + when(queue.getIdentifier()).thenReturn("1234"); + when(connection.getFlowFileQueue()).thenReturn(queue); + + queueProvider.addConnection(connection); + + StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id(1L); + ffBuilder.size(0L); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final List records = new ArrayList<>(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123"); + record.setDestination(queue); + records.add(record); + + repo.updateRepository(records); + repo.close(); + + // restore + final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties)); + repo2.initialize(new StandardResourceClaimManager()); + repo2.loadFlowFiles(queueProvider); + assertTrue(repo2.isValidSwapLocationSuffix("swap123")); + assertFalse(repo2.isValidSwapLocationSuffix("other")); + repo2.close(); + } + + @Test + public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException { + final Path path = Paths.get("target/test-swap-repo"); + if (Files.exists(path)) { + FileUtils.deleteFile(path.toFile(), true); + } + + final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties)); + repo.initialize(new StandardResourceClaimManager()); + + final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider(); + repo.loadFlowFiles(queueProvider); + + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + + final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class); + when(queue.getIdentifier()).thenReturn("1234"); + when(connection.getFlowFileQueue()).thenReturn(queue); + + queueProvider.addConnection(connection); + + StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id(1L); + ffBuilder.size(0L); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final List records = new ArrayList<>(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123"); + record.setDestination(queue); + records.add(record); + + assertFalse(repo.isValidSwapLocationSuffix("swap123")); + repo.updateRepository(records); + assertTrue(repo.isValidSwapLocationSuffix("swap123")); + repo.close(); + } + + @Test + public void testResourceClaimsIncremented() throws IOException { + final ResourceClaimManager claimManager = new StandardResourceClaimManager(); + + final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider(); + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); + + final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager(); + final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B"); + + when(connection.getFlowFileQueue()).thenReturn(queue); + queueProvider.addConnection(connection); + + final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false); + final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L); + + final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false); + final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L); + + // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then, + // indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the + // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(claimManager); + repo.loadFlowFiles(queueProvider); + + // Create a Repository Record that indicates that a FlowFile was created + final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .contentClaim(claim1) + .build(); + final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue); + rec1.setWorking(flowFile1); + rec1.setDestination(queue); + + // Create a Record that we can swap out + final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder() + .id(2L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111112") + .contentClaim(claim2) + .build(); + + final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue); + rec2.setWorking(flowFile2); + rec2.setDestination(queue); + + final List records = new ArrayList<>(); + records.add(rec1); + records.add(rec2); + repo.updateRepository(records); + + final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null); + repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation); + } + + final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager(); + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(recoveryClaimManager); + final long largestId = repo.loadFlowFiles(queueProvider); + + // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out + assertEquals(1, largestId); + } + + // resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts + // because resource claim 2 is referenced only by flowfiles that are swapped out. + assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1)); + assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2)); + + final SwapSummary summary = queue.recoverSwappedFlowFiles(); + assertNotNull(summary); + assertEquals(2, summary.getMaxFlowFileId().intValue()); + assertEquals(new QueueSize(1, 0L), summary.getQueueSize()); + + final List swappedOutClaims = summary.getResourceClaims(); + assertNotNull(swappedOutClaims); + assertEquals(1, swappedOutClaims.size()); + assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0)); + } + + @Test + public void testRestartWithOneRecord() throws IOException { + + final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties)); + repo.initialize(new StandardResourceClaimManager()); + + final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider(); + repo.loadFlowFiles(queueProvider); + + final List flowFileCollection = new ArrayList<>(); + + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + + final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class); + when(queue.getIdentifier()).thenReturn("1234"); + doAnswer((Answer) invocation -> { + flowFileCollection.add((FlowFileRecord) invocation.getArguments()[0]); + return null; + }).when(queue).put(any(FlowFileRecord.class)); + + when(connection.getFlowFileQueue()).thenReturn(queue); + + queueProvider.addConnection(connection); + + StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id(1L); + ffBuilder.addAttribute("abc", "xyz"); + ffBuilder.size(0L); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final List records = new ArrayList<>(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + record.setWorking(flowFileRecord); + record.setDestination(connection.getFlowFileQueue()); + records.add(record); + + repo.updateRepository(records); + + // update to add new attribute + ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world"); + final FlowFileRecord flowFileRecord2 = ffBuilder.build(); + record.setWorking(flowFileRecord2); + repo.updateRepository(records); + + // update size but no attribute + ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L); + final FlowFileRecord flowFileRecord3 = ffBuilder.build(); + record.setWorking(flowFileRecord3); + repo.updateRepository(records); + + repo.close(); + + // restore + final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties)); + repo2.initialize(new StandardResourceClaimManager()); + repo2.loadFlowFiles(queueProvider); + + assertEquals(1, flowFileCollection.size()); + final FlowFileRecord flowFile = flowFileCollection.get(0); + assertEquals(1L, flowFile.getId()); + assertEquals("xyz", flowFile.getAttribute("abc")); + assertEquals(40L, flowFile.getSize()); + assertEquals("world", flowFile.getAttribute("hello")); + + repo2.close(); + } + + + @Test + public void testDoNotRemoveOrphans() throws Exception { + + final TestQueue testQueue = new TestQueue(); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + + repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("abc", "xyz") + .size(0L) + .build() + )); + } + + // restore (& confirm present) + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(1, repo.getInMemoryFlowFiles()); + } + + // restore with empty queue provider (should throw exception) + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(new TestQueueProvider()); + fail(); + } catch (IOException expected) { + assertTrue(expected.getMessage().contains("Found FlowFile in repository without a corresponding queue")); + } + } + + @Test + public void testRemoveOrphans() throws Exception { + + final TestQueue testQueue = new TestQueue(); + + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName, "true"); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + + repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("abc", "xyz") + .size(0L) + .build() + )); + } + + // restore (& confirm present) + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(1, repo.getInMemoryFlowFiles()); + } + // restore with empty queue provider (should throw exception) + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(new TestQueueProvider()); + assertEquals(0, repo.getInMemoryFlowFiles()); + } + } + + @Test + public void testKnownVersion() throws Exception { + final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties); + + // create db with known version + try (RocksDBMetronome db = new RocksDBMetronome.Builder().setStoragePath(RocksDBFlowFileRepository.getFlowFileRepoPath(niFiProperties)).build()) { + db.initialize(); + db.putConfiguration(RocksDBFlowFileRepository.REPOSITORY_VERSION_KEY, RocksDBFlowFileRepository.VERSION_ONE_BYTES); + } + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + } + } + + @Test(expected = IllegalStateException.class) + public void testUnknownVersion() throws Exception { + final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties); + + // create db with known version + try (RocksDBMetronome db = new RocksDBMetronome.Builder().setStoragePath(RocksDBFlowFileRepository.getFlowFileRepoPath(niFiProperties)).build()) { + db.initialize(); + db.putConfiguration(RocksDBFlowFileRepository.REPOSITORY_VERSION_KEY, "UNKNOWN".getBytes(StandardCharsets.UTF_8)); + } + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + } + } + + @Test + public void testRecoveryMode() throws Exception { + + int totalFlowFiles = 50; + + final TestQueue testQueue = new TestQueue(); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + + // add records to the repo + for (int i = 1; i <= totalFlowFiles; i++) { + repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder() + .id(i) + .addAttribute("abc", "xyz") + .size(0L) + .build() + )); + } + assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles()); + } + + // restore in recovery mode with varying limits + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "true"); + for (int recoveryLimit = 0; recoveryLimit < totalFlowFiles; recoveryLimit += 10) { + + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(recoveryLimit)); + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(recoveryLimit, repo.getInMemoryFlowFiles()); + } + } + + // restore in recovery mode with limit equal to available files + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(totalFlowFiles)); + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles()); + } + + // restore in recovery mode with limit higher than available files + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(Integer.MAX_VALUE)); + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles()); + } + + // restore in normal mode + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "false"); + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(0)); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles()); + } + } + + @Test + public void testRecoveryModeWithContinuedLoading() throws Exception { + + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.CLAIM_CLEANUP_PERIOD.propertyName, "24 hours"); // "disable" the cleanup thread, let us manually force recovery + + int totalFlowFiles = 50; + int recoveryLimit = 10; + + final TestQueue testQueue = new TestQueue(); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + + // add records to the repo + for (int i = 1; i <= totalFlowFiles; i++) { + repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder() + .id(i) + .addAttribute("abc", "xyz") + .size(0L) + .build() + )); + } + assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles()); + } + + // restore in recovery mode + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "true"); + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(recoveryLimit)); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(recoveryLimit, repo.getInMemoryFlowFiles()); + assertEquals(totalFlowFiles - recoveryLimit, repo.getRecordsToRestoreCount()); + + long flowFilesRecovered = repo.getInMemoryFlowFiles(); + + for (int i = 0; i < 4; i++) { + testQueue.deleteQueuedFlowFiles(repo); + assertEquals(0, repo.getInMemoryFlowFiles()); + + repo.doRecovery(); + assertEquals(recoveryLimit, repo.getInMemoryFlowFiles()); + + flowFilesRecovered += repo.getInMemoryFlowFiles(); + assertEquals((recoveryLimit * (i + 2)), flowFilesRecovered); + assertEquals(totalFlowFiles - flowFilesRecovered, repo.getRecordsToRestoreCount()); + } + + // should have restored all files + assertEquals(0, repo.getRecordsToRestoreCount()); + assertEquals(recoveryLimit, repo.getInMemoryFlowFiles()); + + // delete last files + testQueue.deleteQueuedFlowFiles(repo); + assertEquals(0, repo.getRecordsToRestoreCount()); + assertEquals(0, repo.getInMemoryFlowFiles()); + + repo.doRecovery(); + + // should have nothing left + assertEquals(0, repo.getRecordsToRestoreCount()); + assertEquals(0, repo.getInMemoryFlowFiles()); + } + + // restore in normal mode + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "false"); + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(1)); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + assertEquals(0, repo.getRecordsToRestoreCount()); + assertEquals(0, repo.getInMemoryFlowFiles()); + } + } + + @Test + public void testStallStop() throws IOException { + final TestQueue testQueue = new TestQueue(); + + // set stall & stop properties + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_STALL_STOP.propertyName, "true"); + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STALL_FLOWFILE_COUNT.propertyName, "2"); + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STOP_FLOWFILE_COUNT.propertyName, "3"); + + // take heap usage out of the calculation + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STALL_HEAP_USAGE_PERCENT.propertyName, "100%"); + additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STOP_HEAP_USAGE_PERCENT.propertyName, "100%"); + + try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) { + + repo.initialize(new StandardResourceClaimManager()); + repo.loadFlowFiles(testQueue.provider); + + StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.addAttribute("abc", "xyz"); + ffBuilder.size(0L); + + List record1 = testQueue.getRepositoryRecord(ffBuilder.id(1).build()); + List record2 = testQueue.getRepositoryRecord(ffBuilder.id(2).build()); + List record3 = testQueue.getRepositoryRecord(ffBuilder.id(3).build()); + + // CREATE one... should incur no penalty + repo.updateRepository(record1); + repo.updateStallStop(); + assertFalse(repo.stallNewFlowFiles); + assertFalse(repo.stopNewFlowFiles); + + // CREATE another... should stall + repo.updateRepository(record2); + repo.updateStallStop(); + assertTrue(repo.stallNewFlowFiles); + assertFalse(repo.stopNewFlowFiles); + + // CREATE another... should stop + repo.updateRepository(record3); + repo.updateStallStop(); + assertTrue(repo.stallNewFlowFiles); + assertTrue(repo.stopNewFlowFiles); + + // DELETE one... should be stalled but not stopped + ((StandardRepositoryRecord) record1.get(0)).markForDelete(); + repo.updateRepository(record1); + repo.updateStallStop(); + assertTrue(repo.stallNewFlowFiles); + assertFalse(repo.stopNewFlowFiles); + + // DELETE another... shouldn't be stalled or stopped + ((StandardRepositoryRecord) record2.get(0)).markForDelete(); + repo.updateRepository(record2); + repo.updateStallStop(); + assertFalse(repo.stallNewFlowFiles); + assertFalse(repo.stopNewFlowFiles); + } + } + + private class TestQueue { + private final TestQueueProvider provider; + private final Collection queuedFlowFiles; + private final Connection connection; + + TestQueue() { + provider = new TestQueueProvider(); + queuedFlowFiles = new ConcurrentSkipListSet<>(); // potentially accessed from multiple threads + + final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, 0, "0 B") { + @Override + public void put(final FlowFileRecord file) { + queuedFlowFiles.add(file); + } + }; + + connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn(queue.getIdentifier()); + when(connection.getFlowFileQueue()).thenReturn(queue); + + provider.addConnection(connection); + + } + + void deleteQueuedFlowFiles(RocksDBFlowFileRepository repo) throws IOException { + Collection recordsToDelete = queuedFlowFiles.stream().map((Function) flowFileRecord -> { + StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFileRecord); + record.markForDelete(); + return record; + }).collect(Collectors.toSet()); + + repo.updateRepository(recordsToDelete); + queuedFlowFiles.clear(); + } + + private List getRepositoryRecord(final FlowFileRecord flowFileRecord) { + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + record.setWorking(flowFileRecord); + record.setDestination(connection.getFlowFileQueue()); + return Collections.singletonList(record); + } + } + + private static class TestQueueProvider implements QueueProvider { + + private List connectionList = new ArrayList<>(); + + void addConnection(final Connection connection) { + this.connectionList.add(connection); + } + + @Override + public Collection getAllQueues() { + final List queueList = new ArrayList<>(); + for (final Connection conn : connectionList) { + queueList.add(conn.getFlowFileQueue()); + } + + return queueList; + } + } + + private static class MockFlowFileSwapManager implements FlowFileSwapManager { + + private final Map>> swappedRecords = new HashMap<>(); + + @Override + public void initialize(SwapManagerInitializationContext initializationContext) { + } + + @Override + public String swapOut(List flowFiles, FlowFileQueue flowFileQueue, final String partitionName) { + Map> swapMap = swappedRecords.computeIfAbsent(flowFileQueue, k -> new HashMap<>()); + + final String location = UUID.randomUUID().toString(); + swapMap.put(location, new ArrayList<>(flowFiles)); + return location; + } + + @Override + public SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) { + Map> swapMap = swappedRecords.get(flowFileQueue); + if (swapMap == null) { + return null; + } + + final List flowFiles = swapMap.get(swapLocation); + final SwapSummary summary = getSwapSummary(swapLocation); + return new StandardSwapContents(summary, flowFiles); + } + + @Override + public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) { + Map> swapMap = swappedRecords.get(flowFileQueue); + if (swapMap == null) { + return null; + } + + final List flowFiles = swapMap.remove(swapLocation); + final SwapSummary summary = getSwapSummary(swapLocation); + return new StandardSwapContents(summary, flowFiles); + } + + @Override + public List recoverSwapLocations(FlowFileQueue flowFileQueue, final String partitionName) { + Map> swapMap = swappedRecords.get(flowFileQueue); + if (swapMap == null) { + return null; + } + + return new ArrayList<>(swapMap.keySet()); + } + + @Override + public SwapSummary getSwapSummary(String swapLocation) { + List records = null; + for (final Map> swapMap : swappedRecords.values()) { + records = swapMap.get(swapLocation); + if (records != null) { + break; + } + } + + if (records == null) { + return null; + } + + final List resourceClaims = new ArrayList<>(); + long size = 0L; + Long maxId = null; + for (final FlowFileRecord flowFile : records) { + size += flowFile.getSize(); + + if (maxId == null || flowFile.getId() > maxId) { + maxId = flowFile.getId(); + } + + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + resourceClaims.add(resourceClaim); + } + } + + return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims); + } + + @Override + public void purge() { + this.swappedRecords.clear(); + } + + @Override + public Set getSwappedPartitionNames(FlowFileQueue queue) { + return Collections.emptySet(); + } + + @Override + public String changePartitionName(String swapLocation, String newPartitionName) { + return swapLocation; + } + } + + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/log4j.properties new file mode 100644 index 0000000000..29dd873ef8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO,console +log4j.category.org.apache.nifi=DEBUG + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n \ No newline at end of file