NIFI-11244 Removed RocksDBFlowFileRepository and nifi-rocksdb-bundle

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7004.
This commit is contained in:
exceptionfactory 2023-03-02 16:14:14 -06:00 committed by Pierre Villard
parent 44c70277ea
commit 89625c78b4
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
13 changed files with 1 additions and 3934 deletions

View File

@ -3435,7 +3435,7 @@ There are currently three implementations of the FlowFile Repository, which are
|====
|*Property*|*Description*
|`nifi.flowfile.repository.implementation`|The FlowFile Repository implementation. The default value is `org.apache.nifi.controller.repository.WriteAheadFlowFileRepository`. The other current options are `org.apache.nifi.controller.repository.VolatileFlowFileRepository` and `org.apache.nifi.controller.repository.RocksDBFlowFileRepository`.
|`nifi.flowfile.repository.implementation`|The FlowFile Repository implementation. The default value is `org.apache.nifi.controller.repository.WriteAheadFlowFileRepository`. The other current options are `org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
|====
NOTE: Switching repository implementations should only be done on an instance with zero queued FlowFiles, and should only be done with caution.
@ -3481,109 +3481,6 @@ NOTE: Unlike the encrypted content and provenance repositories, the repository i
This implementation stores FlowFiles in memory instead of on disk. It *will* result in data loss in the event of power/machine failure or a restart of NiFi. To use this implementation, set `nifi.flowfile.repository.implementation` to `org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
=== RocksDB FlowFile Repository
WARNING: The `RocksDBFlowFileRepository` is deprecated in favor of the `WriteAheadFlowFileRepository`. It will be removed in future major versions of Apache NiFi. To use it in NiFi 1.x, you must download nifi-rocksdb-nar from the Apache Nexus Repository and place it in your NiFi lib directory.
This implementation makes use of the RocksDB key-value store. It uses periodic synchronization to ensure that no created or received data is lost (as long as `nifi.flowfile.repository.rocksdb.accept.data.loss` is set `false`). In the event of a failure (e.g. power loss), _work_ done on FlowFiles through the system (i.e. routing and transformation) may still be lost. Specifically, the record of these actions may be lost, reverting the affected FlowFiles to a previous, valid state. From there, they will resume their path through the flow as normal. This guarantee comes at the expense of a delay on operations that add new data to the system. This delay is configurable (as `nifi.flowfile.repository.rocksdb.sync.period`), and can be tuned to the individual system.
The configuration parameters for this repository fall in to two categories, "NiFi-centric" and "RocksDB-centric". The NiFi-centric settings have to do with the operations of the FlowFile Repository and its interaction with NiFi. The RocksDB-centric settings directly correlate to settings on the underlying RocksDB repo. More information on these settings can be found in the RocksDB documentation: https://github.com/facebook/rocksdb/wiki/RocksJava-Basics.
NOTE: Windows users will need to ensure "Microsoft Visual C++ 2015 Redistributable" is installed for this repository to work. See the following link for more details: https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#maven-windows.
To use this implementation, set `nifi.flowfile.repository.implementation` to `org.apache.nifi.controller.repository.RocksDBFlowFileRepository`.
*NiFi-centric Configuration Properties*:
|====
|*Property*|*Description*
|`nifi.flowfile.repository.directory`|The location of the FlowFile Repository.
The default value is`./flowfile_repository`.
|`nifi.flowfile.repository.rocksdb.sync.warning.period`
|How often to log warnings if unable to sync.
The default value is 30 seconds.
|`nifi.flowfile.repository.rocksdb.claim.cleanup.period`
|How often to mark content claims destructible (so they can be removed from the content repo).
The default value is 30 seconds.
|`nifi.flowfile.repository.rocksdb.deserialization.threads`
|How many threads to use on startup restoring the FlowFile state.
The default value is 16.
|`nifi.flowfile.repository.rocksdb.deserialization.buffer.size`
|Size of the buffer to use on startup restoring the FlowFile state.
The default value is 1000.
|`nifi.flowfile.repository.rocksdb.sync.period`
|Frequency at which to force a sync to disk. This is the maximum period a data creation operation may block if `nifi.flowfile.repository.rocksdb.accept.data.loss` is `false`.
The default value is 10 milliseconds.
|`nifi.flowfile.repository.rocksdb.accept.data.loss`
|Whether to accept the loss of received / created data. Setting this `true` increases throughput if loss of data is acceptable.
The default value is false.
|`nifi.flowfile.repository.rocksdb.enable.stall.stop`
|Whether to enable the stall / stop of writes to the repository based on configured limits. Enabling this feature allows the system to protect itself by restricting (delaying or denying) operations that increase the total FlowFile count on the node to prevent the system from being overwhelmed.
The default value is false.
|`nifi.flowfile.repository.rocksdb.stall.period`
|The period of time to stall when the specified criteria are encountered.
The default value is 100 milliseconds.
|`nifi.flowfile.repository.rocksdb.stall.flowfile.count`
|The FlowFile count at which to begin stalling writes to the repo.
The default value is 800000.
|`nifi.flowfile.repository.rocksdb.stall.heap.usage.percent`
|The heap usage at which to begin stalling writes to the repo.
The default value is 95%.
|`nifi.flowfile.repository.rocksdb.stop.flowfile.count`
|The FlowFile count at which to begin stopping the creation of new FlowFiles.
The default value is 1100000.
|`nifi.flowfile.repository.rocksdb.stop.heap.usage.percent`
|The heap usage at which to begin stopping the creation of new FlowFiles.
The default value is 99.9%.
|`nifi.flowfile.repository.rocksdb.remove.orphaned.flowfiles.on.startup`
|Whether to allow the repository to remove FlowFiles it cannot identify on startup. As this is often the result of a configuration or synchronization error, it is disabled by default. This should only be enabled if you are absolutely certain you want to lose the data in question.
The default value is false.
|`nifi.flowfile.repository.rocksdb.enable.recovery.mode`
|Whether to enable "recovery mode". This limits the number of FlowFiles loaded into the graph at a time, while not actually removing any FlowFiles (or content) from the system. This allows for the recovery of a system that is encountering OutOfMemory errors or similar on startup. This should not be enabled unless necessary to recover a system, and should be disabled as soon as that has been accomplished.
*WARNING:* While in recovery mode, *do not* make modifications to the graph. Changes to the graph may result in the inability to restore further FlowFiles from the repository.
The default value is false.
|`nifi.flowfile.repository.rocksdb.recovery.mode.flowfile.count`
|The number of FlowFiles to load into the graph when in "recovery mode". As FlowFiles leave the system, additional FlowFiles will be loaded up to this limit. This setting does not prevent FlowFiles from coming into the system via normal means.
The default value is 5000.
|====
*RocksDB-centric Configuration Properties:*
|====
|*Property*|*Description*
|`nifi.flowfile.repository.rocksdb.parallel.threads`
|The number of threads to use for flush and compaction. A good value is the number of cores. See RockDB `DBOptions.setIncreaseParallelism()` for more information.
The default value is 8.
|`nifi.flowfile.repository.rocksdb.max.write.buffer.number`
|The maximum number of write buffers that are built up in memory. See RockDB `ColumnFamilyOptions.setMaxWriteBufferNumber()` / `max_write_buffer_number` for more information.
The default value is 4.
|`nifi.flowfile.repository.rocksdb.write.buffer.size`
|The amount of data to build up in memory before converting to a sorted on disk file. Larger values increase performance, especially during bulk loads. Up to `max_write_buffer_number` write buffers may be held in memory at the same time, so you may wish to adjust this parameter to control memory usage. See RockDB `ColumnFamilyOptions.setWriteBufferSize()` / `write_buffer_size` for more information.
The default value is 256 MB.
|`nifi.flowfile.repository.rocksdb.level.0.slowdown.writes.trigger`
|A soft limit on number of level-0 files. Writes are slowed at this point. A values less than 0 means no write slow down will be triggered by the number of files in level-0. See RocksDB `ColumnFamilyOptions.setLevel0SlowdownWritesTrigger()` / `level0_slowdown_writes_trigger` for more information.
The default value is 20.
|`nifi.flowfile.repository.rocksdb.level.0.stop.writes.trigger`
|The maximum number of level-0 files. Writes will be stopped at this point. See RocksDB `ColumnFamilyOptions.setLevel0StopWritesTrigger()` / `level0_stop_writes_trigger` for more information.
The default value is 40.
|`nifi.flowfile.repository.rocksdb.delayed.write.bytes.per.second`
|The limited write rate to the DB if slowdown is triggered. RocksDB may decide to slow down more if the compaction gets behind further. See RocksDB `DBOptions.setDelayedWriteRate()` for more information.
The default value is 16 MB.
|`nifi.flowfile.repository.rocksdb.max.background.flushes`
|Specifies the maximum number of concurrent background flush jobs. See RocksDB `DBOptions.setMaxBackgroundFlushes()` / `max_background_flushes` for more information.
The default value is 1.
|`nifi.flowfile.repository.rocksdb.max.background.compactions`
|Specifies the maximum number of concurrent background compaction jobs. See RocksDB `DBOptions.setMaxBackgroundCompactions()` / `max_background_compactions` for more information.
The default value is 1.
|`nifi.flowfile.repository.rocksdb.min.write.buffer.number.to.merge`
|The minimum number of write buffers to merge together before writing to storage. See RocksDB `ColumnFamilyOptions.setMinWriteBufferNumberToMerge()` / `min_write_buffer_number_to_merge` for more information.
The default value is 1.
|`nifi.flowfile.repository.rocksdb.stat.dump.period`
|The period at which to dump rocksdb.stats to the log. See RocksDB `DBOptions.setStatsDumpPeriodSec()` / `stats_dump_period_sec` for more information.
The default value is 600 sec.
|====
=== Swap Management
NiFi keeps FlowFile information in memory (the JVM)

View File

@ -1,45 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<parent>
<artifactId>nifi-rocksdb-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-rocksdb-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-rocksdb-repository</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -1,236 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
nifi-rules-action-handler-nar includes subcomponents with separate copyright notices and
license terms. Your use of these subcomponents is subject to the terms
and conditions of the following licenses:
The binary distribution of this product bundles the 'ASM' library which is available under a BSD style license.
Copyright (c) 2000-2005 INRIA, France Telecom
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,15 +0,0 @@
nifi-rocksdb-nar
Copyright 2014-2023 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) RocksDB JNI
The following NOTICE information applies:
Copyright (c) 2011-present, Facebook, Inc.

View File

@ -1,56 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<parent>
<artifactId>nifi-rocksdb-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-rocksdb-repository</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.29.5</version>
</dependency>
</dependencies>
</project>

View File

@ -1,891 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.StringUtils;
import org.rocksdb.AccessHint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This RocksDB helper class, instead of forcing to disk every time it's given a record,
* persists all waiting records on a regular interval (using a ScheduledExecutorService).
* Like when a metronome ticks.
*/
class RocksDBMetronome implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(RocksDBMetronome.class);
static final String CONFIGURATION_FAMILY = "configuration.column.family";
static final String DEFAULT_FAMILY = "default";
private final AtomicLong lastSyncWarningNanos = new AtomicLong(0L);
private final int parallelThreads;
private final int maxWriteBufferNumber;
private final int minWriteBufferNumberToMerge;
private final long writeBufferSize;
private final long maxTotalWalSize;
private final long delayedWriteRate;
private final int level0SlowdownWritesTrigger;
private final int level0StopWritesTrigger;
private final int maxBackgroundFlushes;
private final int maxBackgroundCompactions;
private final int statDumpSeconds;
private final long syncMillis;
private final long syncWarningNanos;
private final Path storagePath;
private final boolean adviseRandomOnOpen;
private final boolean createIfMissing;
private final boolean createMissingColumnFamilies;
private final boolean useFsync;
private final Set<byte[]> columnFamilyNames;
private final Map<String, ColumnFamilyHandle> columnFamilyHandles;
private final boolean periodicSyncEnabled;
private final ScheduledExecutorService syncExecutor;
private final ReentrantLock syncLock = new ReentrantLock();
private final Condition syncCondition = syncLock.newCondition();
private final AtomicInteger syncCounter = new AtomicInteger(0);
private volatile RocksDB rocksDB = null;
private final ReentrantReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock dbReadLock = dbReadWriteLock.readLock();
private final ReentrantReadWriteLock.WriteLock dbWriteLock = dbReadWriteLock.writeLock();
private volatile boolean closed = false;
private ColumnFamilyHandle configurationColumnFamilyHandle;
private ColumnFamilyHandle defaultColumnFamilyHandle;
private WriteOptions forceSyncWriteOptions;
private WriteOptions noSyncWriteOptions;
private RocksDBMetronome(Builder builder) {
statDumpSeconds = builder.statDumpSeconds;
parallelThreads = builder.parallelThreads;
maxWriteBufferNumber = builder.maxWriteBufferNumber;
minWriteBufferNumberToMerge = builder.minWriteBufferNumberToMerge;
writeBufferSize = builder.writeBufferSize;
maxTotalWalSize = builder.getMaxTotalWalSize();
delayedWriteRate = builder.delayedWriteRate;
level0SlowdownWritesTrigger = builder.level0SlowdownWritesTrigger;
level0StopWritesTrigger = builder.level0StopWritesTrigger;
maxBackgroundFlushes = builder.maxBackgroundFlushes;
maxBackgroundCompactions = builder.maxBackgroundCompactions;
syncMillis = builder.syncMillis;
syncWarningNanos = builder.syncWarningNanos;
storagePath = builder.storagePath;
adviseRandomOnOpen = builder.adviseRandomOnOpen;
createIfMissing = builder.createIfMissing;
createMissingColumnFamilies = builder.createMissingColumnFamilies;
useFsync = builder.useFsync;
columnFamilyNames = builder.columnFamilyNames;
columnFamilyHandles = new HashMap<>(columnFamilyNames.size());
periodicSyncEnabled = builder.periodicSyncEnabled;
syncExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);
return thread;
});
}
/**
* Initialize the metronome
*
* @throws IOException if there is an issue with the underlying database
*/
public void initialize() throws IOException {
final String rocksSharedLibDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
final String javaTmpDir = System.getProperty("java.io.tmpdir");
final String libDir = !StringUtils.isBlank(rocksSharedLibDir) ? rocksSharedLibDir : javaTmpDir;
try {
Files.createDirectories(Paths.get(libDir));
} catch (IOException e) {
throw new IOException("Unable to load the RocksDB shared library into directory: " + libDir, e);
}
// delete any previous librocksdbjni.so files
final File[] rocksSos = Paths.get(libDir).toFile().listFiles((dir, name) -> name.startsWith("librocksdbjni") && name.endsWith(".so"));
if (rocksSos != null) {
for (File rocksSo : rocksSos) {
if (!rocksSo.delete()) {
logger.warn("Could not delete existing librocksdbjni*.so file {}", rocksSo);
}
}
}
try {
RocksDB.loadLibrary();
} catch (Throwable t) {
if (System.getProperty("os.name").startsWith("Windows")) {
logger.error("The RocksDBMetronome will only work on Windows if you have Visual C++ runtime libraries for Visual Studio 2015 installed. " +
"If the DLLs required to support RocksDB cannot be found, then NiFi will not start!");
}
throw t;
}
Files.createDirectories(storagePath);
forceSyncWriteOptions = new WriteOptions()
.setDisableWAL(false)
.setSync(true);
noSyncWriteOptions = new WriteOptions()
.setDisableWAL(false)
.setSync(false);
dbWriteLock.lock();
try (final DBOptions dbOptions = new DBOptions()
.setAccessHintOnCompactionStart(AccessHint.SEQUENTIAL)
.setAdviseRandomOnOpen(adviseRandomOnOpen)
.setAllowMmapWrites(false) // required to be false for RocksDB.syncWal() to work
.setCreateIfMissing(createIfMissing)
.setCreateMissingColumnFamilies(createMissingColumnFamilies)
.setDelayedWriteRate(delayedWriteRate)
.setIncreaseParallelism(parallelThreads)
.setLogger(getRocksLogger())
.setMaxBackgroundCompactions(maxBackgroundCompactions)
.setMaxBackgroundFlushes(maxBackgroundFlushes)
.setMaxTotalWalSize(maxTotalWalSize)
.setStatsDumpPeriodSec(statDumpSeconds)
.setUseFsync(useFsync)
;
final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setLevel0SlowdownWritesTrigger(level0SlowdownWritesTrigger)
.setLevel0StopWritesTrigger(level0StopWritesTrigger)
.setMaxWriteBufferNumber(maxWriteBufferNumber)
.setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge)
.setWriteBufferSize(writeBufferSize)
) {
// create family descriptors
List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamilyNames.size());
for (byte[] name : columnFamilyNames) {
familyDescriptors.add(new ColumnFamilyDescriptor(name, cfOptions));
}
List<ColumnFamilyHandle> columnFamilyList = new ArrayList<>(columnFamilyNames.size());
rocksDB = RocksDB.open(dbOptions, storagePath.toString(), familyDescriptors, columnFamilyList);
// create the map of names to handles
columnFamilyHandles.put(DEFAULT_FAMILY, rocksDB.getDefaultColumnFamily());
for (ColumnFamilyHandle cf : columnFamilyList) {
columnFamilyHandles.put(new String(cf.getName(), StandardCharsets.UTF_8), cf);
}
// set specific special handles
defaultColumnFamilyHandle = rocksDB.getDefaultColumnFamily();
configurationColumnFamilyHandle = columnFamilyHandles.get(CONFIGURATION_FAMILY);
} catch (RocksDBException e) {
throw new IOException(e);
} finally {
dbWriteLock.unlock();
}
if (periodicSyncEnabled) {
syncExecutor.scheduleWithFixedDelay(this::doSync, syncMillis, syncMillis, TimeUnit.MILLISECONDS);
}
logger.info("Initialized RocksDB Repository at {}", storagePath);
}
/**
* This method checks the state of the database to ensure it is available for use.
* <p>
* NOTE: This *must* be called holding the dbReadLock
*
* @throws IllegalStateException if the database is closed or not yet initialized
*/
private void checkDbState() throws IllegalStateException {
if (rocksDB == null) {
if (closed) {
throw new IllegalStateException("RocksDBMetronome is closed");
}
throw new IllegalStateException("RocksDBMetronome has not been initialized");
}
}
/**
* Return an iterator over the specified column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
* <p>
* Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
*
* @param columnFamilyHandle specifies the column family for the iterator
* @return an iterator over the specified column family
*/
public RocksIterator getIterator(final ColumnFamilyHandle columnFamilyHandle) {
dbReadLock.lock();
try {
checkDbState();
return rocksDB.newIterator(columnFamilyHandle);
} finally {
dbReadLock.unlock();
}
}
/**
* Get the value for the provided key in the specified column family
*
* @param columnFamilyHandle the column family from which to get the value
* @param key the key of the value to retrieve
* @return the value for the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public byte[] get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
return rocksDB.get(columnFamilyHandle, key);
} finally {
dbReadLock.unlock();
}
}
/**
* Put the key / value pair into the database in the specified column family
*
* @param columnFamilyHandle the column family in to which to put the value
* @param writeOptions specification of options for write operations
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, WriteOptions writeOptions, final byte[] key, final byte[] value) throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
rocksDB.put(columnFamilyHandle, writeOptions, key, value);
} finally {
dbReadLock.unlock();
}
}
/**
* Delete the key / value pair from the specified column family
*
* @param columnFamilyHandle the column family in to which to put the value
* @param writeOptions specification of options for write operations
* @param key the key to be inserted
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final WriteOptions writeOptions) throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
rocksDB.delete(columnFamilyHandle, writeOptions, key);
} finally {
dbReadLock.unlock();
}
}
/**
* Flushes the WAL and syncs to disk
*
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void forceSync() throws RocksDBException {
dbReadLock.lock();
try {
checkDbState();
rocksDB.syncWal();
} finally {
dbReadLock.unlock();
}
}
/**
* Get the handle for the specified column family
*
* @param familyName the name of the column family
* @return the handle
*/
public ColumnFamilyHandle getColumnFamilyHandle(String familyName) {
return columnFamilyHandles.get(familyName);
}
/**
* Put the key / value pair into the configuration column family and sync the wal
*
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void putConfiguration(final byte[] key, final byte[] value) throws RocksDBException {
put(configurationColumnFamilyHandle, forceSyncWriteOptions, key, value);
}
/**
* Put the key / value pair into the database in the specified column family without syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException {
put(columnFamilyHandle, noSyncWriteOptions, key, value);
}
/**
* Put the key / value pair into the database in the specified column family, optionally syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
put(columnFamilyHandle, getWriteOptions(forceSync), key, value);
}
/**
* Put the key / value pair into the database in the default column family, optionally syncing the wal
*
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
put(defaultColumnFamilyHandle, getWriteOptions(forceSync), key, value);
}
/**
* Put the key / value pair into the database in the default column family, without syncing the wal
*
* @param key the key to be inserted
* @param value the value to be associated with the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void put(final byte[] key, final byte[] value) throws RocksDBException {
put(defaultColumnFamilyHandle, noSyncWriteOptions, key, value);
}
/**
* Get the value for the provided key in the default column family
*
* @param key the key of the value to retrieve
* @return the value for the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public byte[] get(final byte[] key) throws RocksDBException {
return get(defaultColumnFamilyHandle, key);
}
/**
* Get the value for the provided key in the configuration column family
*
* @param key the key of the value to retrieve
* @return the value for the specified key
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public byte[] getConfiguration(final byte[] key) throws RocksDBException {
return get(configurationColumnFamilyHandle, key);
}
/**
* Delete the key / value pair from the default column family without syncing the wal
*
* @param key the key to be inserted
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(byte[] key) throws RocksDBException {
delete(defaultColumnFamilyHandle, key, noSyncWriteOptions);
}
/**
* Delete the key / value pair from the default column family, optionally syncing the wal
*
* @param key the key to be inserted
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(byte[] key, final boolean forceSync) throws RocksDBException {
delete(defaultColumnFamilyHandle, key, getWriteOptions(forceSync));
}
/**
* Delete the key / value pair from the default column family without syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
delete(columnFamilyHandle, key, noSyncWriteOptions);
}
/**
* Delete the key / value pair from the specified column family, optionally syncing the wal
*
* @param columnFamilyHandle the column family in to which to put the value
* @param key the key to be inserted
* @param forceSync if true, sync the wal
* @throws RocksDBException thrown if there is an error in the underlying library.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean forceSync) throws RocksDBException {
delete(columnFamilyHandle, key, getWriteOptions(forceSync));
}
private WriteOptions getWriteOptions(boolean forceSync) {
return forceSync ? forceSyncWriteOptions : noSyncWriteOptions;
}
/**
* Return an iterator over the default column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
* <p>
* Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
*
* @return an iterator over the default column family
*/
public RocksIterator getIterator() {
return getIterator(defaultColumnFamilyHandle);
}
/**
* Get the current value of the sync counter. This can be used with waitForSync() to verify that operations have been synced to disk
*
* @return the current value of the sync counter
*/
public int getSyncCounterValue() {
return syncCounter.get();
}
/**
* Close the Metronome and the RocksDB Objects
*
* @throws IOException if there are issues in closing any of the underlying RocksDB objects
*/
@Override
public void close() throws IOException {
logger.info("Closing RocksDBMetronome");
dbWriteLock.lock();
try {
logger.info("Shutting down RocksDBMetronome sync executor");
syncExecutor.shutdownNow();
try {
logger.info("Pausing RocksDB background work");
rocksDB.pauseBackgroundWork();
} catch (RocksDBException e) {
logger.warn("Unable to pause background work before close.", e);
}
final AtomicReference<Exception> exceptionReference = new AtomicReference<>();
logger.info("Closing RocksDB configurations");
safeClose(forceSyncWriteOptions, exceptionReference);
safeClose(noSyncWriteOptions, exceptionReference);
// close the column family handles first, then the db last
for (ColumnFamilyHandle cfh : columnFamilyHandles.values()) {
safeClose(cfh, exceptionReference);
}
logger.info("Closing RocksDB database");
safeClose(rocksDB, exceptionReference);
rocksDB = null;
closed = true;
if (exceptionReference.get() != null) {
throw new IOException(exceptionReference.get());
}
} finally {
dbWriteLock.unlock();
}
}
/**
* @param autoCloseable An {@link AutoCloseable} to be closed
* @param exceptionReference A reference to contain any encountered {@link Exception}
*/
private void safeClose(final AutoCloseable autoCloseable, final AtomicReference<Exception> exceptionReference) {
if (autoCloseable != null) {
try {
autoCloseable.close();
} catch (Exception e) {
exceptionReference.set(e);
}
}
}
/**
* @return The capacity of the store
* @throws IOException if encountered
*/
public long getStorageCapacity() throws IOException {
return Files.getFileStore(storagePath).getTotalSpace();
}
/**
* @return The usable space of the store
* @throws IOException if encountered
*/
public long getUsableStorageSpace() throws IOException {
return Files.getFileStore(storagePath).getUsableSpace();
}
/**
* This method is scheduled by the syncExecutor. It runs at the specified interval, forcing the RocksDB WAL to disk.
* <p>
* If the sync is successful, it notifies threads that had been waiting for their records to be persisted using
* syncCondition and the syncCounter, which is incremented to indicate success
*/
void doSync() {
syncLock.lock();
try {
// if we're interrupted, return
if (Thread.currentThread().isInterrupted()) {
return;
}
forceSync();
syncCounter.incrementAndGet(); // its ok if it rolls over... we're just going to check that the value changed
syncCondition.signalAll();
} catch (final IllegalArgumentException e) {
logger.error("Unable to sync, likely because the repository is out of space.", e);
} catch (final Throwable t) {
logger.error("Unable to sync", t);
} finally {
syncLock.unlock();
}
}
/**
* This method blocks until the next time the WAL is forced to disk, ensuring that all records written before this point have been persisted.
*/
public void waitForSync() throws InterruptedException {
final int counterValue = syncCounter.get();
waitForSync(counterValue);
}
/**
* This method blocks until the WAL has been forced to disk, ensuring that all records written before the point specified by the counterValue have been persisted.
*
* @param counterValue The value of the counter at the time of a write we must persist
* @throws InterruptedException if the thread is interrupted
*/
public void waitForSync(final int counterValue) throws InterruptedException {
if (counterValue != syncCounter.get()) {
return; // if the counter has already changed, we don't need to wait (or grab the lock) because the records we're concerned with have already been persisted
}
long waitTimeRemaining = syncWarningNanos;
syncLock.lock();
try {
while (counterValue == syncCounter.get()) { // wait until the counter changes (indicating sync occurred)
waitTimeRemaining = syncCondition.awaitNanos(waitTimeRemaining);
if (waitTimeRemaining <= 0L) { // this means the wait timed out
// only log a warning every syncWarningNanos... don't spam the logs
final long now = System.nanoTime();
final long lastWarning = lastSyncWarningNanos.get();
if (now - lastWarning > syncWarningNanos
&& lastSyncWarningNanos.compareAndSet(lastWarning, now)) {
logger.warn("Failed to sync within {} seconds... system configuration may need to be adjusted", TimeUnit.NANOSECONDS.toSeconds(syncWarningNanos));
}
// reset waiting time
waitTimeRemaining = syncWarningNanos;
}
}
} finally {
syncLock.unlock();
}
}
/**
* Returns a representation of a long as a byte array
*
* @param value a long to convert
* @return a byte[] representation
*/
public static byte[] getBytes(long value) {
byte[] bytes = new byte[8];
writeLong(value, bytes);
return bytes;
}
/**
* Writes a representation of a long to the specified byte array
*
* @param l a long to convert
* @param bytes an array to store the byte representation
*/
public static void writeLong(long l, byte[] bytes) {
bytes[0] = (byte) (l >>> 56);
bytes[1] = (byte) (l >>> 48);
bytes[2] = (byte) (l >>> 40);
bytes[3] = (byte) (l >>> 32);
bytes[4] = (byte) (l >>> 24);
bytes[5] = (byte) (l >>> 16);
bytes[6] = (byte) (l >>> 8);
bytes[7] = (byte) (l);
}
/**
* Creates a long from it's byte array representation
* @param bytes to convert to a long
* @return a long
* @throws IOException if the given byte array is of the wrong size
*/
public static long readLong(final byte[] bytes) throws IOException {
if (bytes.length != 8) {
throw new IOException("wrong number of bytes to convert to long (must be 8)");
}
return (((long) (bytes[0]) << 56) +
((long) (bytes[1] & 255) << 48) +
((long) (bytes[2] & 255) << 40) +
((long) (bytes[3] & 255) << 32) +
((long) (bytes[4] & 255) << 24) +
((long) (bytes[5] & 255) << 16) +
((long) (bytes[6] & 255) << 8) +
((long) (bytes[7] & 255)));
}
/**
* @return the storage path of the db
*/
public Path getStoragePath() {
return storagePath;
}
/**
* @return A RocksDB logger capturing all logging output from RocksDB
*/
private org.rocksdb.Logger getRocksLogger() {
try (Options options = new Options()
// make RocksDB give us everything, and we'll decide what we want to log in our wrapper
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) {
return new LogWrapper(options);
}
}
/**
* An Extension of org.rocksdb.Logger that wraps the slf4j Logger
*/
private class LogWrapper extends org.rocksdb.Logger {
LogWrapper(Options options) {
super(options);
}
@Override
protected void log(final InfoLogLevel infoLogLevel, final String logMsg) {
switch (infoLogLevel) {
case ERROR_LEVEL:
case FATAL_LEVEL:
logger.error(logMsg);
break;
case WARN_LEVEL:
logger.warn(logMsg);
break;
case INFO_LEVEL:
case DEBUG_LEVEL:
logger.debug(logMsg);
break;
case HEADER_LEVEL:
default:
logger.info(logMsg);
break;
}
}
}
public static class Builder {
int parallelThreads = 8;
int maxWriteBufferNumber = 4;
int minWriteBufferNumberToMerge = 1;
long writeBufferSize = (long) DataUnit.MB.toB(256);
long delayedWriteRate = (long) DataUnit.MB.toB(16);
int level0SlowdownWritesTrigger = 20;
int level0StopWritesTrigger = 40;
int maxBackgroundFlushes = 1;
int maxBackgroundCompactions = 1;
int statDumpSeconds = 600;
long syncMillis = 10;
long syncWarningNanos = TimeUnit.SECONDS.toNanos(30);
Path storagePath;
boolean adviseRandomOnOpen = false;
boolean createIfMissing = true;
boolean createMissingColumnFamilies = true;
boolean useFsync = true;
boolean periodicSyncEnabled = true;
final Set<byte[]> columnFamilyNames = new HashSet<>();
public RocksDBMetronome build() {
if (storagePath == null) {
throw new IllegalStateException("Cannot create RocksDBMetronome because storagePath is not set");
}
// add default column families
columnFamilyNames.add(RocksDB.DEFAULT_COLUMN_FAMILY);
columnFamilyNames.add(CONFIGURATION_FAMILY.getBytes(StandardCharsets.UTF_8));
return new RocksDBMetronome(this);
}
public Builder addColumnFamily(String name) {
this.columnFamilyNames.add(name.getBytes(StandardCharsets.UTF_8));
return this;
}
public Builder setStoragePath(Path storagePath) {
this.storagePath = storagePath;
return this;
}
public Builder setParallelThreads(int parallelThreads) {
this.parallelThreads = parallelThreads;
return this;
}
public Builder setMaxWriteBufferNumber(int maxWriteBufferNumber) {
this.maxWriteBufferNumber = maxWriteBufferNumber;
return this;
}
public Builder setMinWriteBufferNumberToMerge(int minWriteBufferNumberToMerge) {
this.minWriteBufferNumberToMerge = minWriteBufferNumberToMerge;
return this;
}
public Builder setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}
public Builder setDelayedWriteRate(long delayedWriteRate) {
this.delayedWriteRate = delayedWriteRate;
return this;
}
public Builder setLevel0SlowdownWritesTrigger(int level0SlowdownWritesTrigger) {
this.level0SlowdownWritesTrigger = level0SlowdownWritesTrigger;
return this;
}
public Builder setLevel0StopWritesTrigger(int level0StopWritesTrigger) {
this.level0StopWritesTrigger = level0StopWritesTrigger;
return this;
}
public Builder setMaxBackgroundFlushes(int maxBackgroundFlushes) {
this.maxBackgroundFlushes = maxBackgroundFlushes;
return this;
}
public Builder setMaxBackgroundCompactions(int maxBackgroundCompactions) {
this.maxBackgroundCompactions = maxBackgroundCompactions;
return this;
}
public Builder setStatDumpSeconds(int statDumpSeconds) {
this.statDumpSeconds = statDumpSeconds;
return this;
}
public Builder setSyncMillis(long syncMillis) {
this.syncMillis = syncMillis;
return this;
}
public Builder setSyncWarningNanos(long syncWarningNanos) {
this.syncWarningNanos = syncWarningNanos;
return this;
}
public Builder setAdviseRandomOnOpen(boolean adviseRandomOnOpen) {
this.adviseRandomOnOpen = adviseRandomOnOpen;
return this;
}
public Builder setCreateMissingColumnFamilies(boolean createMissingColumnFamilies) {
this.createMissingColumnFamilies = createMissingColumnFamilies;
return this;
}
public Builder setCreateIfMissing(boolean createIfMissing) {
this.createIfMissing = createIfMissing;
return this;
}
public Builder setUseFsync(boolean useFsync) {
this.useFsync = useFsync;
return this;
}
public Builder setPeriodicSyncEnabled(boolean periodicSyncEnabled) {
this.periodicSyncEnabled = periodicSyncEnabled;
return this;
}
long getMaxTotalWalSize() {
return writeBufferSize * maxWriteBufferNumber;
}
}
}

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.repository.RocksDBFlowFileRepository

View File

@ -1,802 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@DisabledOnOs(OS.WINDOWS)
public class TestRocksDBFlowFileRepository {
private static final Logger logger = LoggerFactory.getLogger(TestRocksDBFlowFileRepository.class);
private final Map<String, String> additionalProperties = new HashMap<>();
private String nifiPropertiesPath;
@TempDir
public Path temporaryFolder;
@BeforeEach
public void before(TestInfo testInfo) throws IOException {
additionalProperties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, temporaryFolder.toString());
Path properties = Files.createFile(temporaryFolder.resolve(testInfo.getDisplayName() + ".properties"));
Files.copy(Paths.get("src/test/resources/conf/nifi.properties"), properties, StandardCopyOption.REPLACE_EXISTING);
nifiPropertiesPath = properties.toString();
logger.info("Running test: {}", testInfo.getDisplayName());
}
@Test
public void testNormalizeSwapLocation() {
assertEquals("/", RocksDBFlowFileRepository.normalizeSwapLocation("/"));
assertEquals("", RocksDBFlowFileRepository.normalizeSwapLocation(""));
assertNull(RocksDBFlowFileRepository.normalizeSwapLocation(null));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/tmp/test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("//test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt/"));
assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"));
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation(WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/")));
}
@Test
public void testSwapLocationsRestored() throws IOException {
final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo.initialize(new StandardResourceClaimManager());
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
repo.loadFlowFiles(queueProvider);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123");
record.setDestination(queue);
records.add(record);
repo.updateRepository(records);
repo.close();
// restore
final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider);
assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
assertFalse(repo2.isValidSwapLocationSuffix("other"));
repo2.close();
}
@Test
public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
final Path path = Paths.get("target/test-swap-repo");
if (Files.exists(path)) {
FileUtils.deleteFile(path.toFile(), true);
}
final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo.initialize(new StandardResourceClaimManager());
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
repo.loadFlowFiles(queueProvider);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
record.setDestination(queue);
records.add(record);
assertFalse(repo.isValidSwapLocationSuffix("swap123"));
repo.updateRepository(records);
assertTrue(repo.isValidSwapLocationSuffix("swap123"));
repo.close();
}
@Test
public void testResourceClaimsIncremented() throws IOException {
final ResourceClaimManager claimManager = new StandardResourceClaimManager();
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
// Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
// indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
// resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(claimManager);
repo.loadFlowFiles(queueProvider);
// Create a Repository Record that indicates that a FlowFile was created
final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.contentClaim(claim1)
.build();
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
rec1.setWorking(flowFile1, false);
rec1.setDestination(queue);
// Create a Record that we can swap out
final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
.id(2L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
.contentClaim(claim2)
.build();
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
rec2.setWorking(flowFile2, false);
rec2.setDestination(queue);
final List<RepositoryRecord> records = new ArrayList<>();
records.add(rec1);
records.add(rec2);
repo.updateRepository(records);
final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null);
repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
}
final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(recoveryClaimManager);
final long largestId = repo.loadFlowFiles(queueProvider);
// largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
assertEquals(1, largestId);
}
// resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
// because resource claim 2 is referenced only by flowfiles that are swapped out.
assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
final SwapSummary summary = queue.recoverSwappedFlowFiles();
assertNotNull(summary);
assertEquals(2, summary.getMaxFlowFileId().intValue());
assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
final List<ResourceClaim> swappedOutClaims = summary.getResourceClaims();
assertNotNull(swappedOutClaims);
assertEquals(1, swappedOutClaims.size());
assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
}
@Test
public void testRestartWithOneRecord() throws IOException {
final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo.initialize(new StandardResourceClaimManager());
final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
repo.loadFlowFiles(queueProvider);
final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
doAnswer((Answer<Object>) invocation -> {
flowFileCollection.add((FlowFileRecord) invocation.getArguments()[0]);
return null;
}).when(queue).put(any(FlowFileRecord.class));
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.addAttribute("abc", "xyz");
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(flowFileRecord, false);
record.setDestination(connection.getFlowFileQueue());
records.add(record);
repo.updateRepository(records);
// update to add new attribute
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello", "world");
final FlowFileRecord flowFileRecord2 = ffBuilder.build();
record.setWorking(flowFileRecord2, false);
repo.updateRepository(records);
// update size but no attribute
ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
final FlowFileRecord flowFileRecord3 = ffBuilder.build();
record.setWorking(flowFileRecord3, false);
repo.updateRepository(records);
repo.close();
// restore
final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider);
assertEquals(1, flowFileCollection.size());
final FlowFileRecord flowFile = flowFileCollection.get(0);
assertEquals(1L, flowFile.getId());
assertEquals("xyz", flowFile.getAttribute("abc"));
assertEquals(40L, flowFile.getSize());
assertEquals("world", flowFile.getAttribute("hello"));
repo2.close();
}
@Test
public void testDoNotRemoveOrphans() throws Exception {
final TestQueue testQueue = new TestQueue();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
// restore (& confirm present)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(1, repo.getInMemoryFlowFiles());
}
// restore with empty queue provider (should throw exception)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(new TestQueueProvider());
fail();
} catch (IOException expected) {
assertTrue(expected.getMessage().contains("Found FlowFile in repository without a corresponding queue"));
}
}
@Test
public void testRemoveOrphans() throws Exception {
final TestQueue testQueue = new TestQueue();
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName, "true");
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
// restore (& confirm present)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(1, repo.getInMemoryFlowFiles());
}
// restore with empty queue provider (should throw exception)
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(new TestQueueProvider());
assertEquals(0, repo.getInMemoryFlowFiles());
}
}
@Test
public void testKnownVersion() throws Exception {
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties);
// create db with known version
try (RocksDBMetronome db = new RocksDBMetronome.Builder().setStoragePath(RocksDBFlowFileRepository.getFlowFileRepoPath(niFiProperties)).build()) {
db.initialize();
db.putConfiguration(RocksDBFlowFileRepository.REPOSITORY_VERSION_KEY, RocksDBFlowFileRepository.VERSION_ONE_BYTES);
}
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
}
}
@Test
public void testUnknownVersion() throws Exception {
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties);
// create db with known version
try (RocksDBMetronome db = new RocksDBMetronome.Builder().setStoragePath(RocksDBFlowFileRepository.getFlowFileRepoPath(niFiProperties)).build()) {
db.initialize();
db.putConfiguration(RocksDBFlowFileRepository.REPOSITORY_VERSION_KEY, "UNKNOWN".getBytes(StandardCharsets.UTF_8));
}
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
assertThrows(IllegalStateException.class, () -> repo.initialize(new StandardResourceClaimManager()));
}
}
@Test
public void testRecoveryMode() throws Exception {
int totalFlowFiles = 50;
final TestQueue testQueue = new TestQueue();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
// add records to the repo
for (int i = 1; i <= totalFlowFiles; i++) {
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(i)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in recovery mode with varying limits
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "true");
for (int recoveryLimit = 0; recoveryLimit < totalFlowFiles; recoveryLimit += 10) {
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(recoveryLimit));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
}
}
// restore in recovery mode with limit equal to available files
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(totalFlowFiles));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in recovery mode with limit higher than available files
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(Integer.MAX_VALUE));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in normal mode
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "false");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(0));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
}
@Test
public void testRecoveryModeWithContinuedLoading() throws Exception {
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.CLAIM_CLEANUP_PERIOD.propertyName, "24 hours"); // "disable" the cleanup thread, let us manually force recovery
int totalFlowFiles = 50;
int recoveryLimit = 10;
final TestQueue testQueue = new TestQueue();
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
// add records to the repo
for (int i = 1; i <= totalFlowFiles; i++) {
repo.updateRepository(testQueue.getRepositoryRecord(new StandardFlowFileRecord.Builder()
.id(i)
.addAttribute("abc", "xyz")
.size(0L)
.build()
));
}
assertEquals(totalFlowFiles, repo.getInMemoryFlowFiles());
}
// restore in recovery mode
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "true");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(recoveryLimit));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
assertEquals(totalFlowFiles - recoveryLimit, repo.getRecordsToRestoreCount());
long flowFilesRecovered = repo.getInMemoryFlowFiles();
for (int i = 0; i < 4; i++) {
testQueue.deleteQueuedFlowFiles(repo);
assertEquals(0, repo.getInMemoryFlowFiles());
repo.doRecovery();
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
flowFilesRecovered += repo.getInMemoryFlowFiles();
assertEquals((recoveryLimit * (i + 2)), flowFilesRecovered);
assertEquals(totalFlowFiles - flowFilesRecovered, repo.getRecordsToRestoreCount());
}
// should have restored all files
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(recoveryLimit, repo.getInMemoryFlowFiles());
// delete last files
testQueue.deleteQueuedFlowFiles(repo);
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(0, repo.getInMemoryFlowFiles());
repo.doRecovery();
// should have nothing left
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(0, repo.getInMemoryFlowFiles());
}
// restore in normal mode
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, "false");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, Integer.toString(1));
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
assertEquals(0, repo.getRecordsToRestoreCount());
assertEquals(0, repo.getInMemoryFlowFiles());
}
}
@Test
public void testStallStop() throws IOException {
final TestQueue testQueue = new TestQueue();
// set stall & stop properties
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.ENABLE_STALL_STOP.propertyName, "true");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STALL_FLOWFILE_COUNT.propertyName, "2");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STOP_FLOWFILE_COUNT.propertyName, "3");
// take heap usage out of the calculation
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STALL_HEAP_USAGE_PERCENT.propertyName, "100%");
additionalProperties.put(RocksDBFlowFileRepository.RocksDbProperty.STOP_HEAP_USAGE_PERCENT.propertyName, "100%");
try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
repo.initialize(new StandardResourceClaimManager());
repo.loadFlowFiles(testQueue.provider);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.addAttribute("abc", "xyz");
ffBuilder.size(0L);
List<RepositoryRecord> record1 = testQueue.getRepositoryRecord(ffBuilder.id(1).build());
List<RepositoryRecord> record2 = testQueue.getRepositoryRecord(ffBuilder.id(2).build());
List<RepositoryRecord> record3 = testQueue.getRepositoryRecord(ffBuilder.id(3).build());
// CREATE one... should incur no penalty
repo.updateRepository(record1);
repo.updateStallStop();
assertFalse(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
// CREATE another... should stall
repo.updateRepository(record2);
repo.updateStallStop();
assertTrue(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
// CREATE another... should stop
repo.updateRepository(record3);
repo.updateStallStop();
assertTrue(repo.stallNewFlowFiles);
assertTrue(repo.stopNewFlowFiles);
// DELETE one... should be stalled but not stopped
((StandardRepositoryRecord) record1.get(0)).markForDelete();
repo.updateRepository(record1);
repo.updateStallStop();
assertTrue(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
// DELETE another... shouldn't be stalled or stopped
((StandardRepositoryRecord) record2.get(0)).markForDelete();
repo.updateRepository(record2);
repo.updateStallStop();
assertFalse(repo.stallNewFlowFiles);
assertFalse(repo.stopNewFlowFiles);
}
}
private class TestQueue {
private final TestQueueProvider provider;
private final Collection<FlowFileRecord> queuedFlowFiles;
private final Connection connection;
TestQueue() {
provider = new TestQueueProvider();
queuedFlowFiles = new ConcurrentSkipListSet<>(); // potentially accessed from multiple threads
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, "0 sec",0, "0 B") {
@Override
public void put(final FlowFileRecord file) {
queuedFlowFiles.add(file);
}
};
connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn(queue.getIdentifier());
when(connection.getFlowFileQueue()).thenReturn(queue);
provider.addConnection(connection);
}
void deleteQueuedFlowFiles(RocksDBFlowFileRepository repo) throws IOException {
Collection<RepositoryRecord> recordsToDelete = queuedFlowFiles.stream().map((Function<FlowFileRecord, RepositoryRecord>) flowFileRecord -> {
StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFileRecord);
record.markForDelete();
return record;
}).collect(Collectors.toSet());
repo.updateRepository(recordsToDelete);
queuedFlowFiles.clear();
}
private List<RepositoryRecord> getRepositoryRecord(final FlowFileRecord flowFileRecord) {
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(flowFileRecord, false);
record.setDestination(connection.getFlowFileQueue());
return Collections.singletonList(record);
}
}
private static class TestQueueProvider implements QueueProvider {
private List<Connection> connectionList = new ArrayList<>();
void addConnection(final Connection connection) {
this.connectionList.add(connection);
}
@Override
public Collection<FlowFileQueue> getAllQueues() {
final List<FlowFileQueue> queueList = new ArrayList<>();
for (final Connection conn : connectionList) {
queueList.add(conn.getFlowFileQueue());
}
return queueList;
}
}
private static class MockFlowFileSwapManager implements FlowFileSwapManager {
private final Map<FlowFileQueue, Map<String, List<FlowFileRecord>>> swappedRecords = new HashMap<>();
@Override
public void initialize(SwapManagerInitializationContext initializationContext) {
}
@Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.computeIfAbsent(flowFileQueue, k -> new HashMap<>());
final String location = UUID.randomUUID().toString();
swapMap.put(location, new ArrayList<>(flowFiles));
return location;
}
@Override
public SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
final List<FlowFileRecord> flowFiles = swapMap.get(swapLocation);
final SwapSummary summary = getSwapSummary(swapLocation);
return new StandardSwapContents(summary, flowFiles);
}
@Override
public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
final List<FlowFileRecord> flowFiles = swapMap.remove(swapLocation);
final SwapSummary summary = getSwapSummary(swapLocation);
return new StandardSwapContents(summary, flowFiles);
}
@Override
public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, final String partitionName) {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
return new ArrayList<>(swapMap.keySet());
}
@Override
public SwapSummary getSwapSummary(String swapLocation) {
List<FlowFileRecord> records = null;
for (final Map<String, List<FlowFileRecord>> swapMap : swappedRecords.values()) {
records = swapMap.get(swapLocation);
if (records != null) {
break;
}
}
if (records == null) {
return null;
}
final List<ResourceClaim> resourceClaims = new ArrayList<>();
long size = 0L;
Long maxId = null;
for (final FlowFileRecord flowFile : records) {
size += flowFile.getSize();
if (maxId == null || flowFile.getId() > maxId) {
maxId = flowFile.getId();
}
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
resourceClaims.add(resourceClaim);
}
}
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims, 0L, 0L);
}
@Override
public void purge() {
this.swappedRecords.clear();
}
@Override
public String getQueueIdentifier(final String swapLocation) {
return null;
}
@Override
public Set<String> getSwappedPartitionNames(FlowFileQueue queue) {
return Collections.emptySet();
}
@Override
public String changePartitionName(String swapLocation, String newPartitionName) {
return swapLocation;
}
}
}

View File

@ -1,335 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksIterator;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@DisabledOnOs(OS.WINDOWS)
public class TestRocksDBMetronome {
private static final byte[] KEY = "key".getBytes(StandardCharsets.UTF_8);
private static final byte[] VALUE = "value".getBytes(StandardCharsets.UTF_8);
private static final byte[] KEY_2 = "key 2".getBytes(StandardCharsets.UTF_8);
private static final byte[] VALUE_2 = "value 2".getBytes(StandardCharsets.UTF_8);
private ExecutorService executor;
@BeforeEach
public void before() {
executor = Executors.newSingleThreadExecutor();
}
@AfterEach
public void after() {
executor.shutdownNow();
}
@Test
public void testReadWriteLong() throws Exception {
Random random = new Random();
byte[] key = new byte[8];
for (long i = 0; i < 10; i++) {
{
RocksDBMetronome.writeLong(i, key);
assertEquals(i, RocksDBMetronome.readLong(key));
}
{
long testValue = Long.MIN_VALUE + i;
RocksDBMetronome.writeLong(testValue, key);
assertEquals(testValue, RocksDBMetronome.readLong(key));
}
{
long testValue = Long.MAX_VALUE - i;
RocksDBMetronome.writeLong(testValue, key);
assertEquals(testValue, RocksDBMetronome.readLong(key));
}
{
long testValue = random.nextLong();
RocksDBMetronome.writeLong(testValue, key);
assertEquals(testValue, RocksDBMetronome.readLong(key));
}
}
}
private Path newFolder(Path parent) {
File newFolder = parent.resolve("temp-" + System.currentTimeMillis()).toFile();
newFolder.mkdirs();
return newFolder.toPath();
}
@Test
public void testPutGetDelete(@TempDir Path temporaryFolder) throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.build()) {
db.initialize();
assertNull(db.get(KEY));
// test default (no sync)
db.put(KEY, VALUE);
assertArrayEquals(VALUE, db.get(KEY));
db.delete(KEY);
assertNull(db.get(KEY));
// test with "force sync"
db.put(KEY, VALUE, true);
assertArrayEquals(VALUE, db.get(KEY));
db.delete(KEY, true);
assertNull(db.get(KEY));
}
}
@Test
public void testPutGetConfiguration(@TempDir Path temporaryFolder) throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.build()) {
db.initialize();
assertNull(db.getConfiguration(KEY));
db.putConfiguration(KEY, VALUE);
assertArrayEquals(VALUE, db.getConfiguration(KEY));
db.delete(db.getColumnFamilyHandle(RocksDBMetronome.CONFIGURATION_FAMILY), KEY);
assertNull(db.getConfiguration(KEY));
}
}
@Test
public void testPutBeforeInit(@TempDir Path temporaryFolder) throws Exception {
assertThrows(IllegalStateException.class, () -> {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.build()) {
db.put(KEY, VALUE);
}
});
}
@Test
public void testPutClosed(@TempDir Path temporaryFolder) {
assertThrows(IllegalStateException.class, () -> {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.build()) {
db.initialize();
db.close();
db.put(KEY_2, VALUE_2);
}
});
}
@Test
public void testColumnFamilies(@TempDir Path temporaryFolder) throws Exception {
String secondFamilyName = "second family";
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.addColumnFamily(secondFamilyName)
.build()) {
db.initialize();
ColumnFamilyHandle secondFamily = db.getColumnFamilyHandle(secondFamilyName);
// assert nothing present
assertNull(db.get(KEY));
assertNull(db.get(KEY_2));
assertNull(db.get(secondFamily, KEY));
assertNull(db.get(secondFamily, KEY_2));
// add values
db.put(KEY, VALUE);
db.put(secondFamily, KEY_2, VALUE_2);
// assert values present in correct family
assertArrayEquals(VALUE, db.get(KEY));
assertNull(db.get(KEY_2));
assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2));
assertNull(db.get(secondFamily, KEY));
// delete from the "wrong" family
db.delete(KEY_2);
db.delete(secondFamily, KEY);
// assert values *still* present in correct family
assertArrayEquals(VALUE, db.get(KEY));
assertNull(db.get(KEY_2));
assertArrayEquals(VALUE_2, db.get(secondFamily, KEY_2));
assertNull(db.get(secondFamily, KEY));
// delete from the "right" family
db.delete(KEY);
db.delete(secondFamily, KEY_2);
// assert values removed
assertNull(db.get(KEY));
assertNull(db.get(KEY_2));
assertNull(db.get(secondFamily, KEY));
assertNull(db.get(secondFamily, KEY_2));
}
}
@Test
public void testIterator(@TempDir Path temporaryFolder) throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.build()) {
db.initialize();
db.put(KEY, VALUE);
db.put(KEY_2, VALUE_2);
RocksIterator iterator = db.getIterator();
iterator.seekToFirst();
Map<String, byte[]> recovered = new HashMap<>();
while (iterator.isValid()) {
recovered.put(new String(iterator.key(), StandardCharsets.UTF_8), iterator.value());
iterator.next();
}
assertEquals(2, recovered.size());
assertArrayEquals(VALUE, recovered.get(new String(KEY, StandardCharsets.UTF_8)));
assertArrayEquals(VALUE_2, recovered.get(new String(KEY_2, StandardCharsets.UTF_8)));
}
}
@Test
public void testCounterIncrement(@TempDir Path temporaryFolder) throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
.build()) {
db.initialize();
// get initial counter value
int counterValue = db.getSyncCounterValue();
// do the sync (which would normally happen via the db's internal executor)
db.doSync();
// assert counter value incremented
assertEquals(counterValue + 1, db.getSyncCounterValue());
}
}
@Test
@Timeout(unit = TimeUnit.MILLISECONDS, value = 10_000)
public void testWaitForSync(@TempDir Path temporaryFolder) throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
.build()) {
db.initialize();
Future<Boolean> future = executor.submit(() -> {
db.waitForSync();
return true;
});
// the future should still be blocked waiting for sync to happen
assertFalse(future.isDone());
// give the future time to wake up and complete
while (!future.isDone()) {
// TESTING NOTE: this is inside a loop to address a minor *testing* race condition where our first doSync() could happen before the future runs,
// meaning waitForSync() would be left waiting on another doSync() that never comes...
// do the sync (which would normally happen via the db's internal executor)
db.doSync();
Thread.sleep(25);
}
// the future should no longer be blocked
assertTrue(future.isDone());
}
}
@Test
@Timeout(unit = TimeUnit.MILLISECONDS, value = 10_000)
public void testWaitForSyncWithValue(@TempDir Path temporaryFolder) throws Exception {
try (RocksDBMetronome db = new RocksDBMetronome.Builder()
.setStoragePath(newFolder(temporaryFolder))
.setSyncMillis(Long.MAX_VALUE) // effectively disable the auto-sync
.build()) {
db.initialize();
int syncCounterValue = db.getSyncCounterValue();
// "wait" for one before current counter value... should not block
db.waitForSync(syncCounterValue - 1);
// wait for current value... should block (because auto-sync isn't happening)
assertBlocks(db, syncCounterValue);
// do the sync (which would normally happen via the db's internal executor)
db.doSync();
// "wait" for initial value... should now not block
db.waitForSync(syncCounterValue);
// wait for current value again... should block (because auto-sync isn't happening)
assertBlocks(db, db.getSyncCounterValue());
}
}
private void assertBlocks(RocksDBMetronome db, int counterValue) {
Future<Boolean> future = getWaitForSyncFuture(db, counterValue);
assertThrows(TimeoutException.class, () -> future.get(1, TimeUnit.SECONDS));
assertFalse(future.isDone());
future.cancel(true);
}
private Future<Boolean> getWaitForSyncFuture(RocksDBMetronome db, int counterValue) {
return executor.submit(() -> {
db.waitForSync(counterValue);
return true;
});
}
}

View File

@ -1,128 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.xml.gz
nifi.flow.configuration.archive.dir=./target/archive/
nifi.flowcontroller.autoResumeState=true
nifi.flowcontroller.graceful.shutdown.period=10 sec
nifi.flowservice.writedelay.interval=2 sec
nifi.administrative.yield.duration=30 sec
nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
nifi.controller.service.configuration.file=./target/controller-services.xml
nifi.templates.directory=./target/templates
nifi.ui.banner.text=UI Banner Text
nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo
nifi.flowfile.repository.partitions=1
nifi.flowfile.repository.checkpoint.interval=2 mins
nifi.queue.swap.threshold=20000
nifi.swap.storage.directory=./target/test-repo/swap
nifi.swap.in.period=5 sec
nifi.swap.in.threads=1
nifi.swap.out.period=5 sec
nifi.swap.out.threads=4
# Content Repository
nifi.content.claim.max.appendable.size=1 MB
nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/content_repository
nifi.content.repository.archive.enabled=true
nifi.content.repository.archive.max.usage.percentage=90%
# Provenance Repository Properties
nifi.provenance.repository.storage.directory=./target/provenance_repository
nifi.provenance.repository.max.storage.time=24 hours
nifi.provenance.repository.max.storage.size=1 GB
nifi.provenance.repository.rollover.time=30 secs
nifi.provenance.repository.rollover.size=100 MB
# Site to Site properties
nifi.remote.input.socket.port=9990
nifi.remote.input.secure=true
# web properties #
nifi.web.war.directory=./target/lib
nifi.web.http.host=
nifi.web.http.port=8080
nifi.web.https.host=
nifi.web.https.port=
nifi.web.jetty.working.directory=./target/work/jetty
# security properties #
nifi.sensitive.props.key=key
nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
nifi.security.keystore=
nifi.security.keystoreType=
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=
nifi.security.truststoreType=
nifi.security.truststorePasswd=
nifi.security.user.authorizer=
# cluster common properties (cluster manager and nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.is.secure=false
nifi.cluster.protocol.socket.timeout=30 sec
nifi.cluster.protocol.connection.handshake.timeout=45 sec
# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured #
nifi.cluster.protocol.use.multicast=false
nifi.cluster.protocol.multicast.address=
nifi.cluster.protocol.multicast.port=
nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms
nifi.cluster.protocol.multicast.service.locator.attempts=3
nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec
# cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=false
nifi.cluster.node.address=
nifi.cluster.node.protocol.port=
nifi.cluster.node.protocol.threads=2
# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx #
nifi.cluster.node.unicast.manager.address=
nifi.cluster.node.unicast.manager.protocol.port=
nifi.cluster.node.unicast.manager.authority.provider.port=
# cluster manager properties (only configure for cluster manager) #
nifi.cluster.is.manager=false
nifi.cluster.manager.address=
nifi.cluster.manager.protocol.port=
nifi.cluster.manager.authority.provider.port=
nifi.cluster.manager.authority.provider.threads=10
nifi.cluster.manager.node.firewall.file=
nifi.cluster.manager.node.event.history.size=10
nifi.cluster.manager.node.api.connection.timeout=30 sec
nifi.cluster.manager.node.api.read.timeout=30 sec
nifi.cluster.manager.node.api.request.threads=10
nifi.cluster.manager.flow.retrieval.delay=5 sec
nifi.cluster.manager.protocol.threads=10
nifi.cluster.manager.safemode.duration=0 sec
# analytics properties #
nifi.analytics.predict.interval=3 mins
nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares
nifi.analytics.connection.model.score.name=rSquared
nifi.analytics.connection.model.score.threshold=.9

View File

@ -1,32 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-nar-bundles</artifactId>
<groupId>org.apache.nifi</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-rocksdb-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-rocksdb-nar</module>
<module>nifi-rocksdb-repository</module>
</modules>
</project>

View File

@ -109,7 +109,6 @@
<module>nifi-geohash-bundle</module>
<module>nifi-snowflake-bundle</module>
<module>nifi-salesforce-bundle</module>
<module>nifi-rocksdb-bundle</module>
<module>nifi-zendesk-bundle</module>
<module>nifi-hubspot-bundle</module>
<module>nifi-dropbox-bundle</module>