NIFI-4775: FlowFile Repository implementation based on RocksDB

+l from markobean.

This closes #3638.

Signed-off-by: Brandon <devriesb@apache.org>
This commit is contained in:
Brandon Devries 2018-08-16 15:52:03 -04:00 committed by Brandon
parent 2baafcc2e6
commit 7d77b464cc
11 changed files with 3413 additions and 31 deletions

View File

@ -0,0 +1,44 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>1.10.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-rocksdb-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.0.1</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,891 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.rocksdb;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.StringUtils;
import org.rocksdb.AccessHint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This RocksDB helper class, instead of forcing to disk every time it's given a record,
* persists all waiting records on a regular interval (using a ScheduledExecutorService).
* Like when a metronome ticks.
*/
public class RocksDBMetronome implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(RocksDBMetronome.class);
static final String CONFIGURATION_FAMILY = "configuration.column.family";
static final String DEFAULT_FAMILY = "default";
private final AtomicLong lastSyncWarningNanos = new AtomicLong(0L);
private final int parallelThreads;
private final int maxWriteBufferNumber;
private final int minWriteBufferNumberToMerge;
private final long writeBufferSize;
private final long maxTotalWalSize;
private final long delayedWriteRate;
private final int level0SlowdownWritesTrigger;
private final int level0StopWritesTrigger;
private final int maxBackgroundFlushes;
private final int maxBackgroundCompactions;
private final int statDumpSeconds;
private final long syncMillis;
private final long syncWarningNanos;
private final Path storagePath;
private final boolean adviseRandomOnOpen;
private final boolean createIfMissing;
private final boolean createMissingColumnFamilies;
private final boolean useFsync;
private final Set<byte[]> columnFamilyNames;
private final Map<String, ColumnFamilyHandle> columnFamilyHandles;
private final boolean periodicSyncEnabled;
private final ScheduledExecutorService syncExecutor;
private final ReentrantLock syncLock = new ReentrantLock();
private final Condition syncCondition = syncLock.newCondition();
private final AtomicInteger syncCounter = new AtomicInteger(0);
private volatile RocksDB rocksDB = null;
private final ReentrantReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock dbReadLock = dbReadWriteLock.readLock();
private final ReentrantReadWriteLock.WriteLock dbWriteLock = dbReadWriteLock.writeLock();
private volatile boolean closed = false;
private ColumnFamilyHandle configurationColumnFamilyHandle;
private ColumnFamilyHandle defaultColumnFamilyHandle;
private WriteOptions forceSyncWriteOptions;
private WriteOptions noSyncWriteOptions;
private RocksDBMetronome(Builder builder) {
statDumpSeconds = builder.statDumpSeconds;
parallelThreads = builder.parallelThreads;
maxWriteBufferNumber = builder.maxWriteBufferNumber;
minWriteBufferNumberToMerge = builder.minWriteBufferNumberToMerge;
writeBufferSize = builder.writeBufferSize;
maxTotalWalSize = builder.getMaxTotalWalSize();
delayedWriteRate = builder.delayedWriteRate;
level0SlowdownWritesTrigger = builder.level0SlowdownWritesTrigger;
level0StopWritesTrigger = builder.level0StopWritesTrigger;
maxBackgroundFlushes = builder.maxBackgroundFlushes;
maxBackgroundCompactions = builder.maxBackgroundCompactions;
syncMillis = builder.syncMillis;
syncWarningNanos = builder.syncWarningNanos;
storagePath = builder.storagePath;
adviseRandomOnOpen = builder.adviseRandomOnOpen;
createIfMissing = builder.createIfMissing;
createMissingColumnFamilies = builder.createMissingColumnFamilies;
useFsync = builder.useFsync;
columnFamilyNames = builder.columnFamilyNames;
columnFamilyHandles = new HashMap<>(columnFamilyNames.size());
periodicSyncEnabled = builder.periodicSyncEnabled;
syncExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);
return thread;
});
}
/**
* Initialize the metronome
*
* @throws IOException if there is an issue with the underlying database
*/
public void initialize() throws IOException {
final String rocksSharedLibDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
final String javaTmpDir = System.getProperty("java.io.tmpdir");
final String libDir = !StringUtils.isBlank(rocksSharedLibDir) ? rocksSharedLibDir : javaTmpDir;
try {
Files.createDirectories(Paths.get(libDir));
} catch (IOException e) {
throw new IOException("Unable to load the RocksDB shared library into directory: " + libDir, e);
}
// delete any previous librocksdbjni.so files
final File[] rocksSos = Paths.get(libDir).toFile().listFiles((dir, name) -> name.startsWith("librocksdbjni") && name.endsWith(".so"));
if (rocksSos != null) {
for (File rocksSo : rocksSos) {
if (!rocksSo.delete()) {
logger.warn("Could not delete existing librocksdbjni*.so file {}", rocksSo);
}
}
}
try {
RocksDB.loadLibrary();
} catch (Throwable t) {
if (System.getProperty("os.name").startsWith("Windows")) {
logger.error("The RocksDBMetronome will only work on Windows if you have Visual C++ runtime libraries for Visual Studio 2015 installed. " +
"If the DLLs required to support RocksDB cannot be found, then NiFi will not start!");
}
throw t;
}
Files.createDirectories(storagePath);
forceSyncWriteOptions = new WriteOptions()
.setDisableWAL(false)
.setSync(true);
noSyncWriteOptions = new WriteOptions()
.setDisableWAL(false)
.setSync(false);
dbWriteLock.lock();
try (final DBOptions dbOptions = new DBOptions()
.setAccessHintOnCompactionStart(AccessHint.SEQUENTIAL)
.setAdviseRandomOnOpen(adviseRandomOnOpen)
.setAllowMmapWrites(false) // required to be false for RocksDB.syncWal() to work
.setCreateIfMissing(createIfMissing)
.setCreateMissingColumnFamilies(createMissingColumnFamilies)
.setDelayedWriteRate(delayedWriteRate)
.setIncreaseParallelism(parallelThreads)
.setLogger(getRocksLogger())
.setMaxBackgroundCompactions(maxBackgroundCompactions)
.setMaxBackgroundFlushes(maxBackgroundFlushes)
.setMaxTotalWalSize(maxTotalWalSize)
.setStatsDumpPeriodSec(statDumpSeconds)
.setUseFsync(useFsync)
;
final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setLevel0SlowdownWritesTrigger(level0SlowdownWritesTrigger)
.setLevel0StopWritesTrigger(level0StopWritesTrigger)
.setMaxWriteBufferNumber(maxWriteBufferNumber)
.setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge)
.setWriteBufferSize(writeBufferSize)
) {
// create family descriptors
List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamilyNames.size());
for (byte[] name : columnFamilyNames) {
familyDescriptors.add(new ColumnFamilyDescriptor(name, cfOptions));
}
List<ColumnFamilyHandle> columnFamilyList = new ArrayList<>(columnFamilyNames.size());
rocksDB = RocksDB.open(dbOptions, storagePath.toString(), familyDescriptors, columnFamilyList);
// create the map of names to handles
columnFamilyHandles.put(DEFAULT_FAMILY, rocksDB.getDefaultColumnFamily());
for (ColumnFamilyHandle cf : columnFamilyList) {
columnFamilyHandles.put(new String(cf.getName(), StandardCharsets.UTF_8), cf);
}
// set specific special handles
defaultColumnFamilyHandle = rocksDB.getDefaultColumnFamily();
configurationColumnFamilyHandle = columnFamilyHandles.get(CONFIGURATION_FAMILY);
} catch (RocksDBException e) {
throw new IOException(e);
} finally {
dbWriteLock.unlock();
}
if (periodicSyncEnabled) {
syncExecutor.scheduleWithFixedDelay(this::doSync, syncMillis, syncMillis, TimeUnit.MILLISECONDS);
}
logger.info("Initialized RocksDB Repository at {}", storagePath);
}
/**
* This method checks the state of the database to ensure it is available for use.
* <p>
* NOTE: This *must* be called holding the dbReadLock
*
* @throws IllegalStateException if the database is closed or not yet initialized
*/
private void checkDbState() throws IllegalStateException {
if (rocksDB == null) {
if (closed) {
throw new IllegalStateException("RocksDBMetronome is closed");
}
throw new IllegalStateException("RocksDBMetronome has not been initialized");
}
}
/**
* Return an iterator over the specified column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
* <p>
* Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
*
* @param columnFamilyHandle specifies the column family for the iterator
* @return an iterator over the specified column family
*/
public RocksIterator getIterator(final ColumnFamilyHandle columnFamilyHandle) {
dbReadLock.lock();
try {
checkDbState();
return rocksDB.newIterator(columnFamilyHandle);
} finally {
dbReadLock.unlock();
}
}
/**
* Get the value for the provided key in the specified column family
*
* @param columnFamilyHandle the column family from which to get the value
* @param key the key of the value to retrieve
* @return the value for the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public byte[] get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
return rocksDB.get(columnFamilyHandle, key);
} finally {
dbReadLock.unlock();
}
}
/**
* Put the key / value pair into the database in the specified column family
*
* @param columnFamilyHandle the column family in to which to put the value
* @param writeOptions specification of options for write operations
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, WriteOptions writeOptions, final byte[] key, final byte[] value) throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
rocksDB.put(columnFamilyHandle, writeOptions, key, value);
} finally {
dbReadLock.unlock();
}
}
/**
* Delete the key / value pair from the specified column family
*
* @param columnFamilyHandle the column family in to which to put the value
* @param writeOptions specification of options for write operations
* @param key the key to be inserted
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final WriteOptions writeOptions) throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
rocksDB.delete(columnFamilyHandle, writeOptions, key);
} finally {
dbReadLock.unlock();
}
}
/**
* Flushes the WAL and syncs to disk
*
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void forceSync() throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
rocksDB.syncWal();
} finally {
dbReadLock.unlock();
}
}
/**
* Get the handle for the specified column family
*
* @param familyName the name of the column family
* @return the handle
*/
public ColumnFamilyHandle getColumnFamilyHandle(String familyName) {
return columnFamilyHandles.get(familyName);
}
/**
* Put the key / value pair into the configuration column family and sync the wal
*
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void putConfiguration(final byte[] key, final byte[] value) throws RocksDBException {
put(configurationColumnFamilyHandle, forceSyncWriteOptions, key, value);
}
/**
* Put the key / value pair into the database in the specified column family without syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException {
put(columnFamilyHandle, noSyncWriteOptions, key, value);
}
/**
* Put the key / value pair into the database in the specified column family, optionally syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
put(columnFamilyHandle, getWriteOptions(forceSync), key, value);
}
/**
* Put the key / value pair into the database in the default column family, optionally syncing the wal
*
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
put(defaultColumnFamilyHandle, getWriteOptions(forceSync), key, value);
}
/**
* Put the key / value pair into the database in the default column family, without syncing the wal
*
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final byte[] key, final byte[] value) throws RocksDBException {
put(defaultColumnFamilyHandle, noSyncWriteOptions, key, value);
}
/**
* Get the value for the provided key in the default column family
*
* @param key the key of the value to retrieve
* @return the value for the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public byte[] get(final byte[] key) throws RocksDBException {
return get(defaultColumnFamilyHandle, key);
}
/**
* Get the value for the provided key in the configuration column family
*
* @param key the key of the value to retrieve
* @return the value for the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public byte[] getConfiguration(final byte[] key) throws RocksDBException {
return get(configurationColumnFamilyHandle, key);
}
/**
* Delete the key / value pair from the default column family without syncing the wal
*
* @param key the key to be inserted
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(byte[] key) throws RocksDBException {
delete(defaultColumnFamilyHandle, key, noSyncWriteOptions);
}
/**
* Delete the key / value pair from the default column family, optionally syncing the wal
*
* @param key the key to be inserted
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(byte[] key, final boolean forceSync) throws RocksDBException {
delete(defaultColumnFamilyHandle, key, getWriteOptions(forceSync));
}
/**
* Delete the key / value pair from the default column family without syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
delete(columnFamilyHandle, key, noSyncWriteOptions);
}
/**
* Delete the key / value pair from the specified column family, optionally syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean forceSync) throws RocksDBException {
delete(columnFamilyHandle, key, getWriteOptions(forceSync));
}
private WriteOptions getWriteOptions(boolean forceSync) {
return forceSync ? forceSyncWriteOptions : noSyncWriteOptions;
}
/**
* Return an iterator over the default column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
* <p>
* Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
*
* @return an iterator over the default column family
*/
public RocksIterator getIterator() {
return getIterator(defaultColumnFamilyHandle);
}
/**
* Get the current value of the sync counter. This can be used with waitForSync() to verify that operations have been synced to disk
*
* @return the current value of the sync counter
*/
public int getSyncCounterValue() {
return syncCounter.get();
}
/**
* Close the Metronome and the RocksDB Objects
*
* @throws IOException if there are issues in closing any of the underlying RocksDB objects
*/
@Override
public void close() throws IOException {
logger.info("Closing RocksDBMetronome");
dbWriteLock.lock();
try {
logger.info("Shutting down RocksDBMetronome sync executor");
syncExecutor.shutdownNow();
try {
logger.info("Pausing RocksDB background work");
rocksDB.pauseBackgroundWork();
} catch (RocksDBException e) {
logger.warn("Unable to pause background work before close.", e);
}
final AtomicReference<Exception> exceptionReference = new AtomicReference<>();
logger.info("Closing RocksDB configurations");
safeClose(forceSyncWriteOptions, exceptionReference);
safeClose(noSyncWriteOptions, exceptionReference);
// close the column family handles first, then the db last
for (ColumnFamilyHandle cfh : columnFamilyHandles.values()) {
safeClose(cfh, exceptionReference);
}
logger.info("Closing RocksDB database");
safeClose(rocksDB, exceptionReference);
rocksDB = null;
closed = true;
if (exceptionReference.get() != null) {
throw new IOException(exceptionReference.get());
}
} finally {
dbWriteLock.unlock();
}
}
/**
* @param autoCloseable An {@link AutoCloseable} to be closed
* @param exceptionReference A reference to contain any encountered {@link Exception}
*/
private void safeClose(final AutoCloseable autoCloseable, final AtomicReference<Exception> exceptionReference) {
if (autoCloseable != null) {
try {
autoCloseable.close();
} catch (Exception e) {
exceptionReference.set(e);
}
}
}
/**
* @return The capacity of the store
* @throws IOException if encountered
*/
public long getStorageCapacity() throws IOException {
return Files.getFileStore(storagePath).getTotalSpace();
}
/**
* @return The usable space of the store
* @throws IOException if encountered
*/
public long getUsableStorageSpace() throws IOException {
return Files.getFileStore(storagePath).getUsableSpace();
}
/**
* This method is scheduled by the syncExecutor. It runs at the specified interval, forcing the RocksDB WAL to disk.
* <p>
* If the sync is successful, it notifies threads that had been waiting for their records to be persisted using
* syncCondition and the syncCounter, which is incremented to indicate success
*/
void doSync() {
syncLock.lock();
try {
// if we're interrupted, return
if (Thread.currentThread().isInterrupted()) {
return;
}
forceSync();
syncCounter.incrementAndGet(); // its ok if it rolls over... we're just going to check that the value changed
syncCondition.signalAll();
} catch (final IllegalArgumentException e) {
logger.error("Unable to sync, likely because the repository is out of space.", e);
} catch (final Throwable t) {
logger.error("Unable to sync", t);
} finally {
syncLock.unlock();
}
}
/**
* This method blocks until the next time the WAL is forced to disk, ensuring that all records written before this point have been persisted.
*/
public void waitForSync() throws InterruptedException {
final int counterValue = syncCounter.get();
waitForSync(counterValue);
}
/**
* This method blocks until the WAL has been forced to disk, ensuring that all records written before the point specified by the counterValue have been persisted.
*
* @param counterValue The value of the counter at the time of a write we must persist
* @throws InterruptedException if the thread is interrupted
*/
public void waitForSync(final int counterValue) throws InterruptedException {
if (counterValue != syncCounter.get()) {
return; // if the counter has already changed, we don't need to wait (or grab the lock) because the records we're concerned with have already been persisted
}
long waitTimeRemaining = syncWarningNanos;
syncLock.lock();
try {
while (counterValue == syncCounter.get()) { // wait until the counter changes (indicating sync occurred)
waitTimeRemaining = syncCondition.awaitNanos(waitTimeRemaining);
if (waitTimeRemaining <= 0L) { // this means the wait timed out
// only log a warning every syncWarningNanos... don't spam the logs
final long now = System.nanoTime();
final long lastWarning = lastSyncWarningNanos.get();
if (now - lastWarning > syncWarningNanos
&& lastSyncWarningNanos.compareAndSet(lastWarning, now)) {
logger.warn("Failed to sync within {} seconds... system configuration may need to be adjusted", TimeUnit.NANOSECONDS.toSeconds(syncWarningNanos));
}
// reset waiting time
waitTimeRemaining = syncWarningNanos;
}
}
} finally {
syncLock.unlock();
}
}
/**
* Returns a representation of a long as a byte array
*
* @param value a long to convert
* @return a byte[] representation
*/
public static byte[] getBytes(long value) {
byte[] bytes = new byte[8];
writeLong(value, bytes);
return bytes;
}
/**
* Writes a representation of a long to the specified byte array
*
* @param l a long to convert
* @param bytes an array to store the byte representation
*/
public static void writeLong(long l, byte[] bytes) {
bytes[0] = (byte) (l >>> 56);
bytes[1] = (byte) (l >>> 48);
bytes[2] = (byte) (l >>> 40);
bytes[3] = (byte) (l >>> 32);
bytes[4] = (byte) (l >>> 24);
bytes[5] = (byte) (l >>> 16);
bytes[6] = (byte) (l >>> 8);
bytes[7] = (byte) (l);
}
/**
* Creates a long from it's byte array representation
* @param bytes to convert to a long
* @return a long
* @throws IOException if the given byte array is of the wrong size
*/
public static long readLong(final byte[] bytes) throws IOException {
if (bytes.length != 8) {
throw new IOException("wrong number of bytes to convert to long (must be 8)");
}
return (((long) (bytes[0]) << 56) +
((long) (bytes[1] & 255) << 48) +
((long) (bytes[2] & 255) << 40) +
((long) (bytes[3] & 255) << 32) +
((long) (bytes[4] & 255) << 24) +
((long) (bytes[5] & 255) << 16) +
((long) (bytes[6] & 255) << 8) +
((long) (bytes[7] & 255)));
}
/**
* @return the storage path of the db
*/
public Path getStoragePath() {
return storagePath;
}
/**
* @return A RocksDB logger capturing all logging output from RocksDB
*/
private org.rocksdb.Logger getRocksLogger() {
try (Options options = new Options()
// make RocksDB give us everything, and we'll decide what we want to log in our wrapper
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) {
return new LogWrapper(options);
}
}
/**
* An Extension of org.rocksdb.Logger that wraps the slf4j Logger
*/
private class LogWrapper extends org.rocksdb.Logger {
LogWrapper(Options options) {
super(options);
}
@Override
protected void log(final InfoLogLevel infoLogLevel, final String logMsg) {
switch (infoLogLevel) {
case ERROR_LEVEL:
case FATAL_LEVEL:
logger.error(logMsg);
break;
case WARN_LEVEL:
logger.warn(logMsg);
break;
case DEBUG_LEVEL:
logger.debug(logMsg);
break;
case INFO_LEVEL:
case HEADER_LEVEL:
default:
logger.info(logMsg);
break;
}
}
}
public static class Builder {
int parallelThreads = 8;
int maxWriteBufferNumber = 4;
int minWriteBufferNumberToMerge = 1;
long writeBufferSize = (long) DataUnit.MB.toB(256);
long delayedWriteRate = (long) DataUnit.MB.toB(16);
int level0SlowdownWritesTrigger = 20;
int level0StopWritesTrigger = 40;
int maxBackgroundFlushes = 1;
int maxBackgroundCompactions = 1;
int statDumpSeconds = 600;
long syncMillis = 10;
long syncWarningNanos = TimeUnit.SECONDS.toNanos(30);
Path storagePath;
boolean adviseRandomOnOpen = false;
boolean createIfMissing = true;
boolean createMissingColumnFamilies = true;
boolean useFsync = true;
boolean periodicSyncEnabled = true;
final Set<byte[]> columnFamilyNames = new HashSet<>();
public RocksDBMetronome build() {
if (storagePath == null) {
throw new IllegalStateException("Cannot create RocksDBMetronome because storagePath is not set");
}
// add default column families
columnFamilyNames.add(RocksDB.DEFAULT_COLUMN_FAMILY);
columnFamilyNames.add(CONFIGURATION_FAMILY.getBytes(StandardCharsets.UTF_8));
return new RocksDBMetronome(this);
}
public Builder addColumnFamily(String name) {
this.columnFamilyNames.add(name.getBytes(StandardCharsets.UTF_8));
return this;
}
public Builder setStoragePath(Path storagePath) {
this.storagePath = storagePath;
return this;
}
public Builder setParallelThreads(int parallelThreads) {
this.parallelThreads = parallelThreads;
return this;
}
public Builder setMaxWriteBufferNumber(int maxWriteBufferNumber) {
this.maxWriteBufferNumber = maxWriteBufferNumber;
return this;
}
public Builder setMinWriteBufferNumberToMerge(int minWriteBufferNumberToMerge) {
this.minWriteBufferNumberToMerge = minWriteBufferNumberToMerge;
return this;
}
public Builder setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}
public Builder setDelayedWriteRate(long delayedWriteRate) {
this.delayedWriteRate = delayedWriteRate;
return this;
}
public Builder setLevel0SlowdownWritesTrigger(int level0SlowdownWritesTrigger) {
this.level0SlowdownWritesTrigger = level0SlowdownWritesTrigger;
return this;
}
public Builder setLevel0StopWritesTrigger(int level0StopWritesTrigger) {
this.level0StopWritesTrigger = level0StopWritesTrigger;
return this;
}
public Builder setMaxBackgroundFlushes(int maxBackgroundFlushes) {
this.maxBackgroundFlushes = maxBackgroundFlushes;
return this;
}
public Builder setMaxBackgroundCompactions(int maxBackgroundCompactions) {
this.maxBackgroundCompactions = maxBackgroundCompactions;
return this;
}
public Builder setStatDumpSeconds(int statDumpSeconds) {
this.statDumpSeconds = statDumpSeconds;
return this;
}
public Builder setSyncMillis(long syncMillis) {
this.syncMillis = syncMillis;
return this;
}
public Builder setSyncWarningNanos(long syncWarningNanos) {
this.syncWarningNanos = syncWarningNanos;
return this;
}
public Builder setAdviseRandomOnOpen(boolean adviseRandomOnOpen) {
this.adviseRandomOnOpen = adviseRandomOnOpen;
return this;
}
public Builder setCreateMissingColumnFamilies(boolean createMissingColumnFamilies) {
this.createMissingColumnFamilies = createMissingColumnFamilies;
return this;
}
public Builder setCreateIfMissing(boolean createIfMissing) {
this.createIfMissing = createIfMissing;
return this;
}
public Builder setUseFsync(boolean useFsync) {
this.useFsync = useFsync;
return this;
}
public Builder setPeriodicSyncEnabled(boolean periodicSyncEnabled) {
this.periodicSyncEnabled = periodicSyncEnabled;
return this;
}
long getMaxTotalWalSize() {
return writeBufferSize * maxWriteBufferNumber;
}
}
}

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.rocksdb;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksIterator;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestRocksDBMetronome {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final byte[] KEY = "key".getBytes(StandardCharsets.UTF_8);
private static final byte[] VALUE = "value".getBytes(StandardCharsets.UTF_8);
private static final byte[] KEY_2 = "key 2".getBytes(StandardCharsets.UTF_8);
private static final byte[] VALUE_2 = "value 2".getBytes(StandardCharsets.UTF_8);
private ExecutorService executor;
@Before
public void before() {
executor = Executors.newSingleThreadExecutor();
}
@After
public void after() {
executor.shutdownNow();
}
@Test
public void testReadWriteLong() throws Exception {
Random random = new Random();
byte[] key = new byte[8];
for (long i = 0; i < 10; i++) {
{
RocksDBMetronome.writeLong(i, key);
assertEquals(i, RocksDBMetronome.readLong(key));
}
{
long testValue = Long.MIN_VALUE + i;
RocksDBMetronome.writeLong(testValue, key);
assertEquals(testValue, RocksDBMetronome.readLong(key));
}
{
long testValue = Long.MAX_VALUE - i;
RocksDBMetronome.writeLong(testValue, key);
assertEquals(testValue, RocksDBMetronome.readLong(key));
}
{
long testValue = random.nextLong();
RocksDBMetronome.writeLong(testValue, key);
assertEquals(testValue, RocksDBMetronome.readLong(key));
}
}
}
@Test
public void testPutGetDelete() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.build()) {
db.initialize();
assertNull(db.get(KEY));
// test default (no sync)
db.put(KEY, VALUE);
assertArrayEquals(VALUE, db.get(KEY));
db.delete(KEY);
assertNull(db.get(KEY));
// test with "force sync"
db.put(KEY, VALUE, true);
assertArrayEquals(VALUE, db.get(KEY));
db.delete(KEY, true);
assertNull(db.get(KEY));
}
}
@Test
public void testPutGetConfiguration() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.build()) {
db.initialize();
assertNull(db.getConfiguration(KEY));
db.putConfiguration(KEY, VALUE);
assertArrayEquals(VALUE, db.getConfiguration(KEY));
db.delete(db.getColumnFamilyHandle(RocksDBMetronome.CONFIGURATION_FAMILY), KEY);
assertNull(db.getConfiguration(KEY));
}
}
@Test(expected = IllegalStateException.class)
public void testPutBeforeInit() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.build()) {
db.put(KEY, VALUE);
}
}
@Test(expected = IllegalStateException.class)
public void testPutClosed() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.build()) {
db.initialize();
db.close();
db.put(KEY_2, VALUE_2);
}
}
@Test
public void testColumnFamilies() throws Exception {
String secondFamilyName = "second family";
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.addColumnFamily(secondFamilyName)
.build()) {
db.initialize();
ColumnFamilyHandle secondFamily = db.getColumnFamilyHandle(secondFamilyName);
// assert nothing present
assertNull(db.get(KEY));
assertNull(db.get(KEY_2));
assertNull(db.get(secondFamily, KEY));
assertNull(db.get(secondFamily, KEY_2));
// add values
db.put(KEY, VALUE);
db.put(secondFamily, KEY_2, VALUE_2);
// assert values present in correct family
assertArrayEquals(VALUE, db.get(KEY));
assertNull(db.get(KEY_2));
assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2));
assertNull(db.get(secondFamily, KEY));
// delete from the "wrong" family
db.delete(KEY_2);
db.delete(secondFamily, KEY);
// assert values *still* present in correct family
assertArrayEquals(VALUE, db.get(KEY));
assertNull(db.get(KEY_2));
assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2));
assertNull(db.get(secondFamily, KEY));
// delete from the "right" family
db.delete(KEY);
db.delete(secondFamily, KEY_2);
// assert values removed
assertNull(db.get(KEY));
assertNull(db.get(KEY_2));
assertNull(db.get(secondFamily, KEY));
assertNull(db.get(secondFamily, KEY_2));
}
}
@Test
public void testIterator() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.build()) {
db.initialize();
db.put(KEY, VALUE);
db.put(KEY_2, VALUE_2);
RocksIterator iterator = db.getIterator();
iterator.seekToFirst();
Map<String, byte[]> recovered = new HashMap<>();
while (iterator.isValid()) {
recovered.put(new String(iterator.key(), StandardCharsets.UTF_8), iterator.value());
iterator.next();
}
assertEquals(2, recovered.size());
assertArrayEquals(VALUE, recovered.get(new String(KEY, StandardCharsets.UTF_8)));
assertArrayEquals(VALUE_2, recovered.get(new String(KEY_2, StandardCharsets.UTF_8)));
}
}
@Test
public void testCounterIncrement() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
.build()) {
db.initialize();
// get initial counter value
int counterValue = db.getSyncCounterValue();
// do the sync (which would normally happen via the db's internal executor)
db.doSync();
// assert counter value incremented
assertEquals(counterValue + 1, db.getSyncCounterValue());
}
}
@Test(timeout = 10_000)
public void testWaitForSync() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
.build()) {
db.initialize();
Future<Boolean> future = executor.submit(() -> {
db.waitForSync();
return true;
});
// the future should still be blocked waiting for sync to happen
assertFalse(future.isDone());
// give the future time to wake up and complete
while (!future.isDone()) {
// TESTING NOTE: this is inside a loop to address a minor *testing* race condition where our first doSync() could happen before the future runs,
// meaning waitForSync() would be left waiting on another doSync() that never comes...
// do the sync (which would normally happen via the db's internal executor)
db.doSync();
Thread.sleep(25);
}
// the future should no longer be blocked
assertTrue(future.isDone());
}
}
@Test(timeout = 10_000)
public void testWaitForSyncWithValue() throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(temporaryFolder.newFolder().toPath())
.setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
.build()) {
db.initialize();
int syncCounterValue = db.getSyncCounterValue();
// "wait" for one before current counter value... should not block
db.waitForSync(syncCounterValue - 1);
// wait for current value... should block (because auto-sync isn't happening)
assertBlocks(db, syncCounterValue);
// do the sync (which would normally happen via the db's internal executor)
db.doSync();
// "wait" for initial value... should now not block
db.waitForSync(syncCounterValue);
// wait for current value again... should block (because auto-sync isn't happening)
assertBlocks(db, db.getSyncCounterValue());
}
}
private void assertBlocks(RocksDBMetronome db, int counterValue) throws InterruptedException, java.util.concurrent.ExecutionException {
Future<Boolean> future = getWaitForSyncFuture(db, counterValue);
try {
future.get(1, TimeUnit.SECONDS);
fail();
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
future.cancel(true);
}
private Future<Boolean> getWaitForSyncFuture(RocksDBMetronome db, int counterValue) {
return executor.submit(() -> {
db.waitForSync(counterValue);
return true;
});
}
}

View File

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO,console
log4j.category.org.apache.nifi=DEBUG
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

View File

@ -12,7 +12,9 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
@ -24,22 +26,23 @@
<packaging>pom</packaging>
<modules>
<module>nifi-data-provenance-utils</module>
<module>nifi-flowfile-packager</module>
<module>nifi-expression-language</module>
<module>nifi-logging-utils</module>
<module>nifi-properties</module>
<module>nifi-security-utils</module>
<module>nifi-socket-utils</module>
<module>nifi-utils</module>
<module>nifi-json-utils</module>
<module>nifi-web-utils</module>
<module>nifi-write-ahead-log</module>
<module>nifi-site-to-site-client</module>
<module>nifi-flowfile-packager</module>
<module>nifi-hl7-query-language</module>
<module>nifi-schema-utils</module>
<module>nifi-record</module>
<module>nifi-record-path</module>
<module>nifi-json-utils</module>
<module>nifi-logging-utils</module>
<module>nifi-metrics</module>
<module>nifi-parameter</module>
<module>nifi-properties</module>
<module>nifi-record</module>
<module>nifi-record-path</module>
<module>nifi-rocksdb-utils</module>
<module>nifi-schema-utils</module>
<module>nifi-security-utils</module>
<module>nifi-site-to-site-client</module>
<module>nifi-socket-utils</module>
<module>nifi-utils</module>
<module>nifi-web-utils</module>
<module>nifi-write-ahead-log</module>
</modules>
</project>

View File

@ -143,6 +143,11 @@
<artifactId>nifi-write-ahead-log</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-rocksdb-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-repo-serialization</artifactId>

View File

@ -515,7 +515,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
updateRepository(repoRecords, true);
synchronized (this.swapLocationSuffixes) {
this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
this.swapLocationSuffixes.remove(normalizeSwapLocation(swapLocation));
}
logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});

View File

@ -1,16 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
org.apache.nifi.controller.repository.VolatileFlowFileRepository
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.repository.RocksDBFlowFileRepository
org.apache.nifi.controller.repository.VolatileFlowFileRepository
org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -0,0 +1,807 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.rocksdb.RocksDBMetronome;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
public class TestRocksDBFlowFileRepository {
private static final Logger logger = LoggerFactory.getLogger(TestRocksDBFlowFileRepository.class);
private final Map<String, String> additionalProperties = new HashMap<>();
private String nifiPropertiesPath;
@Rule
public TestName testName = new TestName();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void before() throws IOException {
File testRepoDir = temporaryFolder.newFolder(testName.getMethodName());
additionalProperties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, testRepoDir.getAbsolutePath());
File properties = temporaryFolder.newFile();
Files.copy(Paths.get("src/test/resources/conf/nifi.properties"), properties.toPath(), StandardCopyOption.REPLACE_EXISTING);
nifiPropertiesPath = properties.getAbsolutePath();
logger.info("Running test: {}", testName.getMethodName());
}
@Test
public void testNormalizeSwapLocation() {
assertEquals("/", RocksDBFlowFileRepository.normalizeSwapLocation("/"));
assertEquals("", RocksDBFlowFileRepository.normalizeSwapLocation(""));
assertNull(RocksDBFlowFileRepository.normalizeSwapLocation(null));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/tmp/test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("//test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt/"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"));
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation(WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/")));
}
@Test
public void testSwapLocationsRestored() throws IOException {
final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo.initialize(new StandardResourceClaimManager());
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
repo.loadFlowFiles(queueProvider);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123");
record.setDestination(queue);
records.add(record);
repo.updateRepository(records);
repo.close();
// restore
final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider);
assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
assertFalse(repo2.isValidSwapLocationSuffix("other"));
repo2.close();
}
@Test
public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
final Path path = Paths.get("target/test-swap-repo");
if (Files.exists(path)) {
FileUtils.deleteFile(path.toFile(), true);
}
final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo.initialize(new StandardResourceClaimManager());
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
repo.loadFlowFiles(queueProvider);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
record.setDestination(queue);
records.add(record);
assertFalse(repo.isValidSwapLocationSuffix("swap123"));
repo.updateRepository(records);
assertTrue(repo.isValidSwapLocationSuffix("swap123"));
repo.close();
}
@Test
public void testResourceClaimsIncremented() throws IOException {
final ResourceClaimManager claimManager = new StandardResourceClaimManager();
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
// Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
// indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
// resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(claimManager);
repo.loadFlowFiles(queueProvider);
// Create a Repository Record that indicates that a FlowFile was created
final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.contentClaim(claim1)
.build();
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
rec1.setWorking(flowFile1);
rec1.setDestination(queue);
// Create a Record that we can swap out
final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
.id(2L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
.contentClaim(claim2)
.build();
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
rec2.setWorking(flowFile2);
rec2.setDestination(queue);
final List<RepositoryRecord> records = new ArrayList<>();
records.add(rec1);
records.add(rec2);
repo.updateRepository(records);
final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null);
repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
}
final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(recoveryClaimManager);
final long largestId = repo.loadFlowFiles(queueProvider);
// largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
assertEquals(1, largestId);
}
// resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
// because resource claim 2 is referenced only by flowfiles that are swapped out.
assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
final SwapSummary summary = queue.recoverSwappedFlowFiles();
assertNotNull(summary);
assertEquals(2, summary.getMaxFlowFileId().intValue());
assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
final List<ResourceClaim> swappedOutClaims = summary.getResourceClaims();
assertNotNull(swappedOutClaims);
assertEquals(1, swappedOutClaims.size());
assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
}
@Test
public void testRestartWithOneRecord() throws IOException {
final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo.initialize(new StandardResourceClaimManager());
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
repo.loadFlowFiles(queueProvider);
final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
doAnswer((Answer<Object>) invocation -> {
flowFileCollection.add((FlowFileRecord) invocation.getArguments()[0]);
return null;
}).when(queue).put(any(FlowFileRecord.class));
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.addAttribute("abc", "xyz");
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(flowFileRecord);
record.setDestination(connection.getFlowFileQueue());
records.add(record);
repo.updateRepository(records);
// update to add new attribute
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world");
final FlowFileRecord flowFileRecord2 = ffBuilder.build();
record.setWorking(flowFileRecord2);
repo.updateRepository(records);
// update size but no attribute
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
final FlowFileRecord flowFileRecord3 = ffBuilder.build();
record.setWorking(flowFileRecord3);
repo.updateRepository(records);
repo.close();
// restore
final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider);
assertEquals(1, flowFileCollection.size());
final FlowFileRecord flowFile = flowFileCollection.get(0);
assertEquals(1L, flowFile.getId());
assertEquals("xyz", flowFile.getAttribute("abc"));
assertEquals(40L, flowFile.getSize());
assertEquals("world", flowFile.getAttribute("hello"));
repo2.close();
}
@Test
public void testDoNotRemoveOrphans() throws Exception {
final TestQueue testQueue = new TestQueue();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
// restore (& confirm present)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(1, repo.getInMemoryFlowFiles());
}
// restore with empty queue provider (should throw exception)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(new TestQueueProvider());
fail();
} catch (IOException expected) {
assertTrue(expected.getMessage().contains("Found FlowFile in repository without a corresponding queue"));
}
}
@Test
public void testRemoveOrphans() throws Exception {
final TestQueue testQueue = new TestQueue();
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName, "true");
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
// restore (& confirm present)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(1, repo.getInMemoryFlowFiles());
}
// restore with empty queue provider (should throw exception)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(new TestQueueProvider());
assertEquals(0, repo.getInMemoryFlowFiles());
}
}
@Test
public void testKnownVersion() throws Exception {
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties);
// create db with known version
try (RocksDBMetronome db = new RocksDBMetronome.Builder().setStoragePath(RocksDBFlowFileRepository.getFlowFileRepoPath(niFiProperties)).build()) {
db.initialize();
db.putConfiguration(RocksDBFlowFileRepository.REPOSITORY_VERSION_KEY, RocksDBFlowFileRepository.VERSION_ONE_BYTES);
}
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
}
}
@Test(expected = IllegalStateException.class)
public void testUnknownVersion() throws Exception {
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties);
// create db with known version
try (RocksDBMetronome db = new RocksDBMetronome.Builder().setStoragePath(RocksDBFlowFileRepository.getFlowFileRepoPath(niFiProperties)).build()) {
db.initialize();
db.putConfiguration(RocksDBFlowFileRepository.REPOSITORY_VERSION_KEY, "UNKNOWN".getBytes(StandardCharsets.UTF_8));
}
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
}
}
@Test
public void testRecoveryMode() throws Exception {
int totalFlowFiles = 50;
final TestQueue testQueue = new TestQueue();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
// add records to the repo
for (int i = 1; i <= totalFlowFiles; i++) {
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(i)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in recovery mode with varying limits
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "true");
for (int recoveryLimit = 0; recoveryLimit < totalFlowFiles; recoveryLimit += 10) {
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(recoveryLimit));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
}
}
// restore in recovery mode with limit equal to available files
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(totalFlowFiles));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in recovery mode with limit higher than available files
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(Integer.MAX_VALUE));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in normal mode
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "false");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(0));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
}
@Test
public void testRecoveryModeWithContinuedLoading() throws Exception {
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.CLAIM_CLEANUP_PERIOD.propertyName, "24 hours"); // "disable" the cleanup thread, let us manually force recovery
int totalFlowFiles = 50;
int recoveryLimit = 10;
final TestQueue testQueue = new TestQueue();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
// add records to the repo
for (int i = 1; i <= totalFlowFiles; i++) {
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(i)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in recovery mode
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "true");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(recoveryLimit));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
assertEquals(totalFlowFiles - recoveryLimit, repo.getRecordsToRestoreCount());
long flowFilesRecovered = repo.getInMemoryFlowFiles();
for (int i = 0; i < 4; i++) {
testQueue.deleteQueuedFlowFiles(repo);
assertEquals(0, repo.getInMemoryFlowFiles());
repo.doRecovery();
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
flowFilesRecovered += repo.getInMemoryFlowFiles();
assertEquals((recoveryLimit * (i + 2)), flowFilesRecovered);
assertEquals(totalFlowFiles - flowFilesRecovered, repo.getRecordsToRestoreCount());
}
// should have restored all files
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
// delete last files
testQueue.deleteQueuedFlowFiles(repo);
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(0, repo.getInMemoryFlowFiles());
repo.doRecovery();
// should have nothing left
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(0, repo.getInMemoryFlowFiles());
}
// restore in normal mode
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "false");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(1));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(0, repo.getInMemoryFlowFiles());
}
}
@Test
public void testStallStop() throws IOException {
final TestQueue testQueue = new TestQueue();
// set stall & stop properties
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_STALL_STOP.propertyName, "true");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STALL_FLOWFILE_COUNT.propertyName, "2");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STOP_FLOWFILE_COUNT.propertyName, "3");
// take heap usage out of the calculation
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STALL_HEAP_USAGE_PERCENT.propertyName, "100%");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STOP_HEAP_USAGE_PERCENT.propertyName, "100%");
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.addAttribute("abc", "xyz");
ffBuilder.size(0L);
List<RepositoryRecord> record1 = testQueue.getRepositoryRecord(ffBuilder.id(1).build());
List<RepositoryRecord> record2 = testQueue.getRepositoryRecord(ffBuilder.id(2).build());
List<RepositoryRecord> record3 = testQueue.getRepositoryRecord(ffBuilder.id(3).build());
// CREATE one... should incur no penalty
repo.updateRepository(record1);
repo.updateStallStop();
assertFalse(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
// CREATE another... should stall
repo.updateRepository(record2);
repo.updateStallStop();
assertTrue(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
// CREATE another... should stop
repo.updateRepository(record3);
repo.updateStallStop();
assertTrue(repo.stallNewFlowFiles);
assertTrue(repo.stopNewFlowFiles);
// DELETE one... should be stalled but not stopped
((StandardRepositoryRecord) record1.get(0)).markForDelete();
repo.updateRepository(record1);
repo.updateStallStop();
assertTrue(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
// DELETE another... shouldn't be stalled or stopped
((StandardRepositoryRecord) record2.get(0)).markForDelete();
repo.updateRepository(record2);
repo.updateStallStop();
assertFalse(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
}
}
private class TestQueue {
private final TestQueueProvider provider;
private final Collection<FlowFileRecord> queuedFlowFiles;
private final Connection connection;
TestQueue() {
provider = new TestQueueProvider();
queuedFlowFiles = new ConcurrentSkipListSet<>(); // potentially accessed from multiple threads
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, 0, "0 B") {
@Override
public void put(final FlowFileRecord file) {
queuedFlowFiles.add(file);
}
};
connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn(queue.getIdentifier());
when(connection.getFlowFileQueue()).thenReturn(queue);
provider.addConnection(connection);
}
void deleteQueuedFlowFiles(RocksDBFlowFileRepository repo) throws IOException {
Collection<RepositoryRecord> recordsToDelete = queuedFlowFiles.stream().map((Function<FlowFileRecord, RepositoryRecord>) flowFileRecord -> {
StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFileRecord);
record.markForDelete();
return record;
}).collect(Collectors.toSet());
repo.updateRepository(recordsToDelete);
queuedFlowFiles.clear();
}
private List<RepositoryRecord> getRepositoryRecord(final FlowFileRecord flowFileRecord) {
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(flowFileRecord);
record.setDestination(connection.getFlowFileQueue());
return Collections.singletonList(record);
}
}
private static class TestQueueProvider implements QueueProvider {
private List<Connection> connectionList = new ArrayList<>();
void addConnection(final Connection connection) {
this.connectionList.add(connection);
}
@Override
public Collection<FlowFileQueue> getAllQueues() {
final List<FlowFileQueue> queueList = new ArrayList<>();
for (final Connection conn : connectionList) {
queueList.add(conn.getFlowFileQueue());
}
return queueList;
}
}
private static class MockFlowFileSwapManager implements FlowFileSwapManager {
private final Map<FlowFileQueue, Map<String, List<FlowFileRecord>>> swappedRecords = new HashMap<>();
@Override
public void initialize(SwapManagerInitializationContext initializationContext) {
}
@Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.computeIfAbsent(flowFileQueue, k -> new HashMap<>());
final String location = UUID.randomUUID().toString();
swapMap.put(location, new ArrayList<>(flowFiles));
return location;
}
@Override
public SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
final List<FlowFileRecord> flowFiles = swapMap.get(swapLocation);
final SwapSummary summary = getSwapSummary(swapLocation);
return new StandardSwapContents(summary, flowFiles);
}
@Override
public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
final List<FlowFileRecord> flowFiles = swapMap.remove(swapLocation);
final SwapSummary summary = getSwapSummary(swapLocation);
return new StandardSwapContents(summary, flowFiles);
}
@Override
public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, final String partitionName) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
return new ArrayList<>(swapMap.keySet());
}
@Override
public SwapSummary getSwapSummary(String swapLocation) {
List<FlowFileRecord> records = null;
for (final Map<String, List<FlowFileRecord>> swapMap : swappedRecords.values()) {
records = swapMap.get(swapLocation);
if (records != null) {
break;
}
}
if (records == null) {
return null;
}
final List<ResourceClaim> resourceClaims = new ArrayList<>();
long size = 0L;
Long maxId = null;
for (final FlowFileRecord flowFile : records) {
size += flowFile.getSize();
if (maxId == null || flowFile.getId() > maxId) {
maxId = flowFile.getId();
}
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
resourceClaims.add(resourceClaim);
}
}
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims);
}
@Override
public void purge() {
this.swappedRecords.clear();
}
@Override
public Set<String> getSwappedPartitionNames(FlowFileQueue queue) {
return Collections.emptySet();
}
@Override
public String changePartitionName(String swapLocation, String newPartitionName) {
return swapLocation;
}
}
}

View File

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO,console
log4j.category.org.apache.nifi=DEBUG
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n