diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 08cf0ceec2..a9d6ae21bd 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -3435,7 +3435,7 @@ There are currently three implementations of the FlowFile Repository, which are
|====
|*Property*|*Description*
-|`nifi.flowfile.repository.implementation`|The FlowFile Repository implementation. The default value is `org.apache.nifi.controller.repository.WriteAheadFlowFileRepository`. The other current options are `org.apache.nifi.controller.repository.VolatileFlowFileRepository` and `org.apache.nifi.controller.repository.RocksDBFlowFileRepository`.
+|`nifi.flowfile.repository.implementation`|The FlowFile Repository implementation. The default value is `org.apache.nifi.controller.repository.WriteAheadFlowFileRepository`. The other current options are `org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
|====
NOTE: Switching repository implementations should only be done on an instance with zero queued FlowFiles, and should only be done with caution.
@@ -3481,109 +3481,6 @@ NOTE: Unlike the encrypted content and provenance repositories, the repository i
This implementation stores FlowFiles in memory instead of on disk. It *will* result in data loss in the event of power/machine failure or a restart of NiFi. To use this implementation, set `nifi.flowfile.repository.implementation` to `org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
-
-=== RocksDB FlowFile Repository
-
-WARNING: The `RocksDBFlowFileRepository` is deprecated in favor of the `WriteAheadFlowFileRepository`. It will be removed in future major versions of Apache NiFi. To use it in NiFi 1.x, you must download nifi-rocksdb-nar from the Apache Nexus Repository and place it in your NiFi lib directory.
-
-This implementation makes use of the RocksDB key-value store. It uses periodic synchronization to ensure that no created or received data is lost (as long as `nifi.flowfile.repository.rocksdb.accept.data.loss` is set `false`). In the event of a failure (e.g. power loss), _work_ done on FlowFiles through the system (i.e. routing and transformation) may still be lost. Specifically, the record of these actions may be lost, reverting the affected FlowFiles to a previous, valid state. From there, they will resume their path through the flow as normal. This guarantee comes at the expense of a delay on operations that add new data to the system. This delay is configurable (as `nifi.flowfile.repository.rocksdb.sync.period`), and can be tuned to the individual system.
-
-The configuration parameters for this repository fall in to two categories, "NiFi-centric" and "RocksDB-centric". The NiFi-centric settings have to do with the operations of the FlowFile Repository and its interaction with NiFi. The RocksDB-centric settings directly correlate to settings on the underlying RocksDB repo. More information on these settings can be found in the RocksDB documentation: https://github.com/facebook/rocksdb/wiki/RocksJava-Basics.
-
-NOTE: Windows users will need to ensure "Microsoft Visual C++ 2015 Redistributable" is installed for this repository to work. See the following link for more details: https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#maven-windows.
-
-
-To use this implementation, set `nifi.flowfile.repository.implementation` to `org.apache.nifi.controller.repository.RocksDBFlowFileRepository`.
-
-*NiFi-centric Configuration Properties*:
-|====
-|*Property*|*Description*
-|`nifi.flowfile.repository.directory`|The location of the FlowFile Repository.
-The default value is`./flowfile_repository`.
-|`nifi.flowfile.repository.rocksdb.sync.warning.period`
-|How often to log warnings if unable to sync.
-The default value is 30 seconds.
-|`nifi.flowfile.repository.rocksdb.claim.cleanup.period`
-|How often to mark content claims destructible (so they can be removed from the content repo).
-The default value is 30 seconds.
-|`nifi.flowfile.repository.rocksdb.deserialization.threads`
-|How many threads to use on startup restoring the FlowFile state.
-The default value is 16.
-|`nifi.flowfile.repository.rocksdb.deserialization.buffer.size`
-|Size of the buffer to use on startup restoring the FlowFile state.
-The default value is 1000.
-|`nifi.flowfile.repository.rocksdb.sync.period`
-|Frequency at which to force a sync to disk. This is the maximum period a data creation operation may block if `nifi.flowfile.repository.rocksdb.accept.data.loss` is `false`.
-The default value is 10 milliseconds.
-|`nifi.flowfile.repository.rocksdb.accept.data.loss`
-|Whether to accept the loss of received / created data. Setting this `true` increases throughput if loss of data is acceptable.
-The default value is false.
-|`nifi.flowfile.repository.rocksdb.enable.stall.stop`
-|Whether to enable the stall / stop of writes to the repository based on configured limits. Enabling this feature allows the system to protect itself by restricting (delaying or denying) operations that increase the total FlowFile count on the node to prevent the system from being overwhelmed.
-The default value is false.
-|`nifi.flowfile.repository.rocksdb.stall.period`
-|The period of time to stall when the specified criteria are encountered.
-The default value is 100 milliseconds.
-|`nifi.flowfile.repository.rocksdb.stall.flowfile.count`
-|The FlowFile count at which to begin stalling writes to the repo.
-The default value is 800000.
-|`nifi.flowfile.repository.rocksdb.stall.heap.usage.percent`
-|The heap usage at which to begin stalling writes to the repo.
-The default value is 95%.
-|`nifi.flowfile.repository.rocksdb.stop.flowfile.count`
-|The FlowFile count at which to begin stopping the creation of new FlowFiles.
-The default value is 1100000.
-|`nifi.flowfile.repository.rocksdb.stop.heap.usage.percent`
-|The heap usage at which to begin stopping the creation of new FlowFiles.
-The default value is 99.9%.
-|`nifi.flowfile.repository.rocksdb.remove.orphaned.flowfiles.on.startup`
-|Whether to allow the repository to remove FlowFiles it cannot identify on startup. As this is often the result of a configuration or synchronization error, it is disabled by default. This should only be enabled if you are absolutely certain you want to lose the data in question.
-The default value is false.
-|`nifi.flowfile.repository.rocksdb.enable.recovery.mode`
-|Whether to enable "recovery mode". This limits the number of FlowFiles loaded into the graph at a time, while not actually removing any FlowFiles (or content) from the system. This allows for the recovery of a system that is encountering OutOfMemory errors or similar on startup. This should not be enabled unless necessary to recover a system, and should be disabled as soon as that has been accomplished.
-
-*WARNING:* While in recovery mode, *do not* make modifications to the graph. Changes to the graph may result in the inability to restore further FlowFiles from the repository.
-The default value is false.
-|`nifi.flowfile.repository.rocksdb.recovery.mode.flowfile.count`
-|The number of FlowFiles to load into the graph when in "recovery mode". As FlowFiles leave the system, additional FlowFiles will be loaded up to this limit. This setting does not prevent FlowFiles from coming into the system via normal means.
-The default value is 5000.
-|====
-
-*RocksDB-centric Configuration Properties:*
-|====
-|*Property*|*Description*
-|`nifi.flowfile.repository.rocksdb.parallel.threads`
-|The number of threads to use for flush and compaction. A good value is the number of cores. See RockDB `DBOptions.setIncreaseParallelism()` for more information.
-The default value is 8.
-|`nifi.flowfile.repository.rocksdb.max.write.buffer.number`
-|The maximum number of write buffers that are built up in memory. See RockDB `ColumnFamilyOptions.setMaxWriteBufferNumber()` / `max_write_buffer_number` for more information.
-The default value is 4.
-|`nifi.flowfile.repository.rocksdb.write.buffer.size`
-|The amount of data to build up in memory before converting to a sorted on disk file. Larger values increase performance, especially during bulk loads. Up to `max_write_buffer_number` write buffers may be held in memory at the same time, so you may wish to adjust this parameter to control memory usage. See RockDB `ColumnFamilyOptions.setWriteBufferSize()` / `write_buffer_size` for more information.
-The default value is 256 MB.
-|`nifi.flowfile.repository.rocksdb.level.0.slowdown.writes.trigger`
-|A soft limit on number of level-0 files. Writes are slowed at this point. A values less than 0 means no write slow down will be triggered by the number of files in level-0. See RocksDB `ColumnFamilyOptions.setLevel0SlowdownWritesTrigger()` / `level0_slowdown_writes_trigger` for more information.
-The default value is 20.
-|`nifi.flowfile.repository.rocksdb.level.0.stop.writes.trigger`
-|The maximum number of level-0 files. Writes will be stopped at this point. See RocksDB `ColumnFamilyOptions.setLevel0StopWritesTrigger()` / `level0_stop_writes_trigger` for more information.
-The default value is 40.
-|`nifi.flowfile.repository.rocksdb.delayed.write.bytes.per.second`
-|The limited write rate to the DB if slowdown is triggered. RocksDB may decide to slow down more if the compaction gets behind further. See RocksDB `DBOptions.setDelayedWriteRate()` for more information.
-The default value is 16 MB.
-|`nifi.flowfile.repository.rocksdb.max.background.flushes`
-|Specifies the maximum number of concurrent background flush jobs. See RocksDB `DBOptions.setMaxBackgroundFlushes()` / `max_background_flushes` for more information.
-The default value is 1.
-|`nifi.flowfile.repository.rocksdb.max.background.compactions`
-|Specifies the maximum number of concurrent background compaction jobs. See RocksDB `DBOptions.setMaxBackgroundCompactions()` / `max_background_compactions` for more information.
-The default value is 1.
-|`nifi.flowfile.repository.rocksdb.min.write.buffer.number.to.merge`
-|The minimum number of write buffers to merge together before writing to storage. See RocksDB `ColumnFamilyOptions.setMinWriteBufferNumberToMerge()` / `min_write_buffer_number_to_merge` for more information.
-The default value is 1.
-|`nifi.flowfile.repository.rocksdb.stat.dump.period`
-|The period at which to dump rocksdb.stats to the log. See RocksDB `DBOptions.setStatsDumpPeriodSec()` / `stats_dump_period_sec` for more information.
-The default value is 600 sec.
-|====
-
=== Swap Management
NiFi keeps FlowFile information in memory (the JVM)
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/pom.xml b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/pom.xml
deleted file mode 100644
index 73da55935d..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/pom.xml
+++ /dev/null
@@ -1,45 +0,0 @@
-
-
-
-
- nifi-rocksdb-bundle
- org.apache.nifi
- 2.0.0-SNAPSHOT
-
- 4.0.0
-
- nifi-rocksdb-nar
- nar
-
-
- true
- true
-
-
-
-
- org.apache.nifi
- nifi-framework-nar
- 2.0.0-SNAPSHOT
- nar
-
-
- org.apache.nifi
- nifi-rocksdb-repository
- 2.0.0-SNAPSHOT
-
-
-
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/src/main/resources/LICENSE b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/src/main/resources/LICENSE
deleted file mode 100644
index 1f84df0623..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/src/main/resources/LICENSE
+++ /dev/null
@@ -1,236 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
-
-nifi-rules-action-handler-nar includes subcomponents with separate copyright notices and
-license terms. Your use of these subcomponents is subject to the terms
-and conditions of the following licenses:
-
- The binary distribution of this product bundles the 'ASM' library which is available under a BSD style license.
-
- Copyright (c) 2000-2005 INRIA, France Telecom
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions
- are met:
- 1. Redistributions of source code must retain the above copyright
- notice, this list of conditions and the following disclaimer.
- 2. Redistributions in binary form must reproduce the above copyright
- notice, this list of conditions and the following disclaimer in the
- documentation and/or other materials provided with the distribution.
- 3. Neither the name of the copyright holders nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/src/main/resources/NOTICE b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/src/main/resources/NOTICE
deleted file mode 100644
index 98774e17f9..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-nar/src/main/resources/NOTICE
+++ /dev/null
@@ -1,15 +0,0 @@
-nifi-rocksdb-nar
-Copyright 2014-2023 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-******************
-Apache Software License v2
-******************
-
-The following binary components are provided under the Apache Software License v2
-
- (ASLv2) RocksDB JNI
- The following NOTICE information applies:
- Copyright (c) 2011-present, Facebook, Inc.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml
deleted file mode 100644
index 7979db854b..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-
-
-
-
- nifi-rocksdb-bundle
- org.apache.nifi
- 2.0.0-SNAPSHOT
-
- 4.0.0
-
- nifi-rocksdb-repository
- jar
-
-
-
- org.apache.nifi
- nifi-api
- 2.0.0-SNAPSHOT
- provided
-
-
- org.apache.nifi
- nifi-properties
- 2.0.0-SNAPSHOT
- provided
-
-
- org.apache.nifi
- nifi-framework-core
- 2.0.0-SNAPSHOT
- provided
-
-
- org.apache.nifi
- nifi-deprecation-log
-
-
- org.rocksdb
- rocksdbjni
- 6.29.5
-
-
-
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
deleted file mode 100644
index ede2429152..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
+++ /dev/null
@@ -1,1273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.deprecation.log.DeprecationLogger;
-import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-/**
- *
- * Implements FlowFile Repository using RocksDB as the backing store.
- *
- */
-@Deprecated
-public class RocksDBFlowFileRepository implements FlowFileRepository {
-
- private static final Logger logger = LoggerFactory.getLogger(RocksDBFlowFileRepository.class);
- private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(RocksDBFlowFileRepository.class);
-
- private static final String FLOWFILE_PROPERTY_PREFIX = "nifi.flowfile.repository.";
- private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = FLOWFILE_PROPERTY_PREFIX + "directory";
-
- private static final byte[] SWAP_LOCATION_SUFFIX_KEY = "swap.location.sufixes".getBytes(StandardCharsets.UTF_8);
- private static final byte[] SERIALIZATION_ENCODING_KEY = "serial.encoding".getBytes(StandardCharsets.UTF_8);
- private static final byte[] SERIALIZATION_HEADER_KEY = "serial.header".getBytes(StandardCharsets.UTF_8);
- static final byte[] REPOSITORY_VERSION_KEY = "repository.version".getBytes(StandardCharsets.UTF_8);
- static final byte[] VERSION_ONE_BYTES = "1.0".getBytes(StandardCharsets.UTF_8);
- private static final IllegalStateException NO_NEW_FLOWFILES = new IllegalStateException("Repository is not currently accepting new FlowFiles");
- private static final Runtime runtime = Runtime.getRuntime();
- private static final NumberFormat percentFormat = NumberFormat.getPercentInstance();
-
- private final Map queueMap = new HashMap<>();
-
- /**
- * Each property is defined by its name in the file and its default value
- */
- enum RocksDbProperty {
- //FlowFileRepo Configuration Parameters
- SYNC_WARNING_PERIOD("rocksdb.sync.warning.period", "30 seconds"),
- CLAIM_CLEANUP_PERIOD("rocksdb.claim.cleanup.period", "30 seconds"),
- DESERIALIZATION_THREADS("rocksdb.deserialization.threads", "16"),
- DESERIALIZATION_BUFFER_SIZE("rocksdb.deserialization.buffer.size", "1000"),
- SYNC_PERIOD("rocksdb.sync.period", "10 milliseconds"),
- ACCEPT_DATA_LOSS("rocksdb.accept.data.loss", "false"),
- ENABLE_STALL_STOP("rocksdb.enable.stall.stop", "false"),
- STALL_PERIOD("rocksdb.stall.period", "100 milliseconds"),
- STALL_FLOWFILE_COUNT("rocksdb.stall.flowfile.count", "800000"),
- STALL_HEAP_USAGE_PERCENT("rocksdb.stall.heap.usage.percent", "95%"),
- STOP_FLOWFILE_COUNT("rocksdb.stop.flowfile.count", "1100000"),
- STOP_HEAP_USAGE_PERCENT("rocksdb.stop.heap.usage.percent", "99.9%"),
- REMOVE_ORPHANED_FLOWFILES("rocksdb.remove.orphaned.flowfiles.on.startup", "false"),
- ENABLE_RECOVERY_MODE("rocksdb.enable.recovery.mode", "false"),
- RECOVERY_MODE_FLOWFILE_LIMIT("rocksdb.recovery.mode.flowfile.count", "5000"),
-
- //RocksDB Configuration Parameters
- DB_PARALLEL_THREADS("rocksdb.parallel.threads", "8"),
- MAX_WRITE_BUFFER_NUMBER("rocksdb.max.write.buffer.number", "4"),
- WRITE_BUFFER_SIZE("rocksdb.write.buffer.size", "256 MB"),
- LEVEL_O_SLOWDOWN_WRITES_TRIGGER("rocksdb.level.0.slowdown.writes.trigger", "20"),
- LEVEL_O_STOP_WRITES_TRIGGER("rocksdb.level.0.stop.writes.trigger", "40"),
- DELAYED_WRITE_RATE("rocksdb.delayed.write.bytes.per.second", "16 MB"),
- MAX_BACKGROUND_FLUSHES("rocksdb.max.background.flushes", "1"),
- MAX_BACKGROUND_COMPACTIONS("rocksdb.max.background.compactions", "1"),
- MIN_WRITE_BUFFER_NUMBER_TO_MERGE("rocksdb.min.write.buffer.number.to.merge", "1"),
- STAT_DUMP_PERIOD("rocksdb.stat.dump.period", "600 sec"),
- ;
-
- final String propertyName;
- final String defaultValue;
-
- RocksDbProperty(String propertyName, String defaultValue) {
- this.propertyName = FLOWFILE_PROPERTY_PREFIX + propertyName;
- this.defaultValue = defaultValue;
- }
-
- /**
- * @param niFiProperties The Properties file
- * @param timeUnit The desired time unit
- * @return The property Value in the desired units
- */
- long getTimeValue(NiFiProperties niFiProperties, TimeUnit timeUnit) {
- String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
- long timeValue = 0L;
- try {
- timeValue = Math.round(FormatUtils.getPreciseTimeDuration(propertyValue, timeUnit));
- } catch (IllegalArgumentException e) {
- this.generateIllegalArgumentException(propertyValue, e);
- }
- return timeValue;
- }
-
- /**
- * @param niFiProperties The Properties file
- * @return The property value as a boolean
- */
- boolean getBooleanValue(NiFiProperties niFiProperties) {
- String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
- return Boolean.parseBoolean(propertyValue);
- }
-
- /**
- * @param niFiProperties The Properties file
- * @return The property value as an int
- */
- int getIntValue(NiFiProperties niFiProperties) {
- String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
- int returnValue = 0;
- try {
- returnValue = Integer.parseInt(propertyValue);
- } catch (NumberFormatException e) {
- this.generateIllegalArgumentException(propertyValue, e);
- }
- return returnValue;
- }
-
- /**
- * @param niFiProperties The Properties file
- * @return The property value as a number of bytes
- */
- long getByteCountValue(NiFiProperties niFiProperties) {
- long returnValue = 0L;
- String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
- try {
- double writeBufferDouble = DataUnit.parseDataSize(propertyValue, DataUnit.B);
- returnValue = (long) (writeBufferDouble < Long.MAX_VALUE ? writeBufferDouble : Long.MAX_VALUE);
- } catch (IllegalArgumentException e) {
- this.generateIllegalArgumentException(propertyValue, e);
- }
- return returnValue;
- }
-
- /**
- * @param niFiProperties The Properties file
- * @return The property value as a percent
- */
- double getPercentValue(NiFiProperties niFiProperties) {
- String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue).replace('%', ' ');
- double returnValue = 0.0D;
- try {
- returnValue = Double.parseDouble(propertyValue) / 100D;
- if (returnValue > 1.0D) {
- this.generateIllegalArgumentException(propertyValue, null);
- }
- } catch (NumberFormatException e) {
- this.generateIllegalArgumentException(propertyValue, e);
- }
- return returnValue;
- }
-
- /**
- * @param niFiProperties The Properties file
- * @return The property value as a long
- */
- long getLongValue(NiFiProperties niFiProperties) {
- String propertyValue = niFiProperties.getProperty(this.propertyName, this.defaultValue);
- long returnValue = 0L;
- try {
- returnValue = Long.parseLong(propertyValue);
- } catch (NumberFormatException e) {
- this.generateIllegalArgumentException(propertyValue, e);
- }
- return returnValue;
- }
-
- void generateIllegalArgumentException(String badValue, Throwable t) {
- throw new IllegalArgumentException("The NiFi Property: [" + this.propertyName + "] with value: [" + badValue + "] is not valid", t);
- }
-
- }
-
-
- private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
- private final int deserializationThreads;
- private final int deserializationBufferSize;
- private final long claimCleanupMillis;
- private final ScheduledExecutorService housekeepingExecutor;
- private final AtomicReference> claimsAwaitingDestruction = new AtomicReference<>(new ArrayList<>());
- private final RocksDBMetronome db;
- private ResourceClaimManager claimManager;
- private RepositoryRecordSerdeFactory serdeFactory;
- private SerDe serializer;
- private String serializationEncodingName;
- private byte[] serializationHeader;
-
- private final boolean acceptDataLoss;
- private final boolean enableStallStop;
- private final boolean removeOrphanedFlowFiles;
- private final boolean enableRecoveryMode;
- private final long recoveryModeFlowFileLimit;
- private final AtomicReference> recordDeserializer = new AtomicReference<>();
- private final List recordsToRestore = Collections.synchronizedList(new LinkedList<>());
-
- private final ReentrantLock stallStopLock = new ReentrantLock();
- private final AtomicLong inMemoryFlowFiles = new AtomicLong(0L);
- volatile boolean stallNewFlowFiles = false;
- volatile boolean stopNewFlowFiles = false;
- private final long stallMillis;
- private final long stallCount;
- private final long stopCount;
- private final double stallPercentage;
- private final double stopPercentage;
-
- private final Set swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
-
- /**
- * default no args constructor for service loading only.
- */
- public RocksDBFlowFileRepository() {
- deserializationThreads = 0;
- deserializationBufferSize = 0;
- claimCleanupMillis = 0;
- housekeepingExecutor = null;
- db = null;
- acceptDataLoss = false;
- enableStallStop = false;
- removeOrphanedFlowFiles = false;
- stallMillis = 0;
- stallCount = 0;
- stopCount = 0;
- stallPercentage = 0;
- stopPercentage = 0;
- enableRecoveryMode = false;
- recoveryModeFlowFileLimit = 0;
- }
-
- public RocksDBFlowFileRepository(final NiFiProperties niFiProperties) {
- deprecationLogger.warn("{} should be replaced with WriteAheadFlowFileRepository for [{}] in nifi.properties",
- getClass().getSimpleName(),
- NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION
- );
-
- deserializationThreads = RocksDbProperty.DESERIALIZATION_THREADS.getIntValue(niFiProperties);
- deserializationBufferSize = RocksDbProperty.DESERIALIZATION_BUFFER_SIZE.getIntValue(niFiProperties);
-
- claimCleanupMillis = RocksDbProperty.CLAIM_CLEANUP_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS);
- housekeepingExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
- Thread thread = Executors.defaultThreadFactory().newThread(r);
- thread.setDaemon(true);
- return thread;
- });
-
- acceptDataLoss = RocksDbProperty.ACCEPT_DATA_LOSS.getBooleanValue(niFiProperties);
- enableStallStop = RocksDbProperty.ENABLE_STALL_STOP.getBooleanValue(niFiProperties);
-
- removeOrphanedFlowFiles = RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.getBooleanValue(niFiProperties);
- if (removeOrphanedFlowFiles) {
- logger.warn("The property \"{}\" is currently set to \"true\". " +
- "This can potentially lead to data loss, and should only be set if you are absolutely certain it is necessary. " +
- "Even then, it should be removed as soon as possible.",
- RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName);
- }
- stallMillis = RocksDbProperty.STALL_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS);
- stallCount = RocksDbProperty.STALL_FLOWFILE_COUNT.getLongValue(niFiProperties);
- stopCount = RocksDbProperty.STOP_FLOWFILE_COUNT.getLongValue(niFiProperties);
- stallPercentage = RocksDbProperty.STALL_HEAP_USAGE_PERCENT.getPercentValue(niFiProperties);
- stopPercentage = RocksDbProperty.STOP_HEAP_USAGE_PERCENT.getPercentValue(niFiProperties);
-
- enableRecoveryMode = RocksDbProperty.ENABLE_RECOVERY_MODE.getBooleanValue(niFiProperties);
- recoveryModeFlowFileLimit = RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.getLongValue(niFiProperties);
- if (enableRecoveryMode) {
- logger.warn("The property \"{}\" is currently set to \"true\" and \"{}\" is set to \"{}\". " +
- "This means that only {} FlowFiles will be loaded in to memory from the FlowFile repo at a time, " +
- "allowing for recovery of a system encountering OutOfMemory errors (or similar). " +
- "This setting should be reset to \"false\" as soon as recovery is complete.",
- RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName, recoveryModeFlowFileLimit, recoveryModeFlowFileLimit);
- }
- db = new RocksDBMetronome.Builder()
- .setStatDumpSeconds((int) (Math.min(RocksDbProperty.STAT_DUMP_PERIOD.getTimeValue(niFiProperties, TimeUnit.SECONDS), Integer.MAX_VALUE)))
- .setParallelThreads(RocksDbProperty.DB_PARALLEL_THREADS.getIntValue(niFiProperties))
- .setMaxWriteBufferNumber(RocksDbProperty.MAX_WRITE_BUFFER_NUMBER.getIntValue(niFiProperties))
- .setMinWriteBufferNumberToMerge(RocksDbProperty.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.getIntValue(niFiProperties))
- .setWriteBufferSize(RocksDbProperty.WRITE_BUFFER_SIZE.getByteCountValue(niFiProperties))
- .setDelayedWriteRate(RocksDbProperty.DELAYED_WRITE_RATE.getByteCountValue(niFiProperties))
- .setLevel0SlowdownWritesTrigger(RocksDbProperty.LEVEL_O_SLOWDOWN_WRITES_TRIGGER.getIntValue(niFiProperties))
- .setLevel0StopWritesTrigger(RocksDbProperty.LEVEL_O_STOP_WRITES_TRIGGER.getIntValue(niFiProperties))
- .setMaxBackgroundFlushes(RocksDbProperty.MAX_BACKGROUND_FLUSHES.getIntValue(niFiProperties))
- .setMaxBackgroundCompactions(RocksDbProperty.MAX_BACKGROUND_COMPACTIONS.getIntValue(niFiProperties))
- .setSyncMillis(RocksDbProperty.SYNC_PERIOD.getTimeValue(niFiProperties, TimeUnit.MILLISECONDS))
- .setSyncWarningNanos(RocksDbProperty.SYNC_WARNING_PERIOD.getTimeValue(niFiProperties, TimeUnit.NANOSECONDS))
- .setStoragePath(getFlowFileRepoPath(niFiProperties))
- .setAdviseRandomOnOpen(false)
- .setCreateMissingColumnFamilies(true)
- .setCreateIfMissing(true)
- .setPeriodicSyncEnabled(!acceptDataLoss)
- .build();
- }
-
- /**
- * @param niFiProperties The Properties file
- * @return The path of the repo
- */
- static Path getFlowFileRepoPath(NiFiProperties niFiProperties) {
- for (final String propertyName : niFiProperties.getPropertyKeys()) {
- if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) {
- final String dirName = niFiProperties.getProperty(propertyName);
- return Paths.get(dirName);
- }
- }
- return null;
- }
-
- /**
- * @return The name of the File Store
- */
- @Override
- public String getFileStoreName() {
- try {
- return Files.getFileStore(db.getStoragePath()).name();
- } catch (IOException e) {
- return null;
- }
- }
-
- @Override
- public void initialize(final ResourceClaimManager claimManager) throws IOException {
- this.db.initialize();
- this.claimManager = claimManager;
- this.serdeFactory = new StandardRepositoryRecordSerdeFactory(claimManager);
-
- try {
- byte[] versionBytes = db.getConfiguration(REPOSITORY_VERSION_KEY);
- if (versionBytes == null) {
- db.putConfiguration(REPOSITORY_VERSION_KEY, VERSION_ONE_BYTES);
- } else if (!Arrays.equals(versionBytes, VERSION_ONE_BYTES)) {
- throw new IllegalStateException("Unknown repository version: " + new String(versionBytes, StandardCharsets.UTF_8));
- }
-
- byte[] serializationEncodingBytes = db.getConfiguration(SERIALIZATION_ENCODING_KEY);
-
- if (serializationEncodingBytes == null) {
- serializer = serdeFactory.createSerDe(null);
- serializationEncodingName = serializer.getClass().getName();
- db.putConfiguration(SERIALIZATION_ENCODING_KEY, serializationEncodingName.getBytes(StandardCharsets.UTF_8));
- } else {
- serializationEncodingName = new String(serializationEncodingBytes, StandardCharsets.UTF_8);
- serializer = serdeFactory.createSerDe(serializationEncodingName);
- }
-
- serializationHeader = db.getConfiguration(SERIALIZATION_HEADER_KEY);
-
- if (serializationHeader == null) {
- try (
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)
- ) {
- serializer.writeHeader(dataOutputStream);
- serializationHeader = byteArrayOutputStream.toByteArray();
- db.putConfiguration(SERIALIZATION_HEADER_KEY, serializationHeader);
- }
- }
-
-
- byte[] swapLocationSuffixBytes = db.getConfiguration(SWAP_LOCATION_SUFFIX_KEY);
- if (swapLocationSuffixBytes != null) {
- try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(swapLocationSuffixBytes);
- ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
- Object o = objectInputStream.readObject();
- if (o instanceof Collection) {
- ((Collection>) o).forEach(obj -> swapLocationSuffixes.add(obj.toString()));
- }
- }
- }
- } catch (RocksDBException | ClassNotFoundException e) {
- throw new IOException(e);
- }
-
- housekeepingExecutor.scheduleWithFixedDelay(this::doHousekeeping, 0, claimCleanupMillis, TimeUnit.MILLISECONDS);
-
- logger.info("Initialized FlowFile Repository at {}", db.getStoragePath());
- }
-
- @Override
- public void close() throws IOException {
- if (housekeepingExecutor != null) {
- housekeepingExecutor.shutdownNow();
- }
-
- if (db != null) {
- db.close();
- }
- }
-
- /**
- * This method is scheduled by the housekeepingExecutor at the specified interval.
- * Catches Throwable so as not to suppress future executions of the ScheduledExecutorService
- */
- private void doHousekeeping() {
- try {
- doClaimCleanup();
- updateStallStop();
- doRecovery();
- } catch (Throwable t) {
- // catching Throwable so as not to suppress subsequent executions
- logger.error("Encountered problem during housekeeping", t);
- }
- }
-
- /**
- * Marks as destructible any claims that were held by records which have now been deleted.
- */
- private void doClaimCleanup() {
- Collection claimsToDestroy;
- synchronized (claimsAwaitingDestruction) {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- claimsToDestroy = claimsAwaitingDestruction.getAndSet(new ArrayList<>());
- }
- if (claimsToDestroy != null) {
- Collection uniqueClaimsToDestroy = new HashSet<>(claimsToDestroy);
-
- try {
- if (!acceptDataLoss) {
- db.waitForSync();
- } else {
- db.forceSync();
- }
- } catch (InterruptedException | RocksDBException e) {
- synchronized (claimsAwaitingDestruction) {
- // if there was an exception, put back the claims we were attempting to destroy
- claimsAwaitingDestruction.get().addAll(uniqueClaimsToDestroy);
- return;
- }
- }
- for (final ResourceClaim claim : uniqueClaimsToDestroy) {
- claimManager.markDestructable(claim);
- }
- }
- }
-
- /**
- * Updates the stalled and stopped status of the repository
- */
- void updateStallStop() {
- // if stall.stop logic is not enabled, return
- if (!enableStallStop) return;
-
- if (stallStopLock.tryLock()) {
- try {
- final long inMemoryFlowFiles = getInMemoryFlowFiles();
-
- if (inMemoryFlowFiles >= stopCount) {
- stopNewFlowFiles = true;
- stallNewFlowFiles = true;
- logger.warn("Halting new FlowFiles because maximum FlowFile count ({}) has been exceeded. Current count: {}",
- new Object[]{stopCount, inMemoryFlowFiles});
- return;
- }
-
- // calculate usage percentage
- final double freeMemory = runtime.freeMemory();
- final double maxMemory = runtime.maxMemory();
- final double usedPercentage = 1.0d - (freeMemory / maxMemory);
-
- if (usedPercentage >= stopPercentage) {
- stopNewFlowFiles = true;
- stallNewFlowFiles = true;
- logger.warn("Halting new FlowFiles because maximum heap usage percentage ({}) has been exceeded. Current usage: {}",
- new Object[]{percentFormat.format(stopPercentage), percentFormat.format(usedPercentage)});
- return;
- }
-
- if (inMemoryFlowFiles >= stallCount) {
- stopNewFlowFiles = false;
- stallNewFlowFiles = true;
- logger.warn("Stalling new FlowFiles because FlowFile count stall threshold ({}) has been exceeded. Current count: {}",
- new Object[]{stallCount, inMemoryFlowFiles});
- return;
- }
-
- if (usedPercentage >= stallPercentage) {
- stopNewFlowFiles = false;
- stallNewFlowFiles = true;
- logger.warn("Stalling new FlowFiles because heap usage percentage threshold ({}) has been exceeded. Current count: {}",
- new Object[]{percentFormat.format(stallPercentage), percentFormat.format(usedPercentage)});
- return;
- }
-
- if (stopNewFlowFiles || stallNewFlowFiles) {
- logger.info("Resuming acceptance of new FlowFiles");
- stopNewFlowFiles = false;
- stallNewFlowFiles = false;
- }
- } finally {
- stallStopLock.unlock();
- }
- }
- }
-
- /**
- * If in recovery mode, restore more FlowFile if under the configured limit
- */
- synchronized void doRecovery() {
-
- // if we are not in recovery mode, return
- if (!enableRecoveryMode) return;
-
- SerDe deserializer = recordDeserializer.get();
- if (deserializer == null) {
- return; // initial load hasn't completed
- }
-
- if (recordsToRestore.isEmpty()) {
- logger.warn("Recovery has been completed. " +
- "The property \"{}\" is currently set to \"true\", but should be reset to \"false\" as soon as possible.",
- RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName);
- return;
- }
-
- logger.warn("The property \"{}\" is currently set to \"true\" and \"{}\" is set to \"{}\". " +
- "This means that only {} FlowFiles will be loaded into memory from the FlowFile repo at a time, " +
- "allowing for recovery of a system encountering OutOfMemory errors (or similar). " +
- "This setting should be reset to \"false\" as soon as recovery is complete. " +
- "There are {} records remaining to be recovered.",
- RocksDbProperty.ENABLE_RECOVERY_MODE.propertyName, RocksDbProperty.RECOVERY_MODE_FLOWFILE_LIMIT.propertyName,
- recoveryModeFlowFileLimit, recoveryModeFlowFileLimit, getRecordsToRestoreCount());
-
- while (!recordsToRestore.isEmpty() && inMemoryFlowFiles.get() < recoveryModeFlowFileLimit) {
- try {
- byte[] key = recordsToRestore.get(0);
- byte[] recordBytes = db.get(key);
- if (recordBytes != null) {
- try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(recordBytes);
- DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
- SerializedRepositoryRecord record = deserializer.deserializeRecord(dataInputStream, deserializer.getVersion());
- final FlowFileRecord flowFile = record.getFlowFileRecord();
-
- final FlowFileQueue queue = queueMap.get(record.getQueueIdentifier());
- if (queue != null) {
- queue.put(flowFile);
- inMemoryFlowFiles.incrementAndGet();
- }
- }
- }
- recordsToRestore.remove(0);
- } catch (IOException | RocksDBException e) {
- logger.warn("Encountered exception during recovery", e);
- }
- }
- }
-
- long getInMemoryFlowFiles() {
- return inMemoryFlowFiles.get();
- }
-
- long getRecordsToRestoreCount() {
- return recordsToRestore.size();
- }
-
- /**
- * Updates the FlowFile repository with the given SerializedRepositoryRecords
- *
- * @param records the records to update the repository with
- * @throws IOException if update fails or a required sync is interrupted
- */
- @Override
- public void updateRepository(final Collection records) throws IOException {
-
- // verify records are valid
- int netIncrease = countAndValidateRecords(records);
-
- final boolean causeIncrease = netIncrease > 0;
- if (causeIncrease && stopNewFlowFiles) {
- updateStallStop();
- throw NO_NEW_FLOWFILES;
- }
-
- //update the db with the new records
- int syncCounterValue = updateRocksDB(records);
- inMemoryFlowFiles.addAndGet(netIncrease);
-
- try {
- // if we created data, but are at a threshold, delay and allow other data to try to get through
- if (causeIncrease && (stallNewFlowFiles || stopNewFlowFiles)) {
- Thread.sleep(stallMillis);
- updateStallStop();
- }
-
- // if we got a record indicating data creation, wait for it to be synced to disk before proceeding
- if (!acceptDataLoss && syncCounterValue > 0) {
- db.waitForSync(syncCounterValue);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
-
- // determine any content claims that can be destroyed
- determineDestructibleClaims(records);
- }
-
- /**
- * Check that each record has the required elements
- *
- * @param records to be validated
- * @return the change in in-memory FlowFile count represented by this Collection
- */
- private int countAndValidateRecords(Collection records) {
- int inMemoryDelta = 0;
- for (RepositoryRecord record : records) {
- validateRecord(record);
- if (record.getType() == RepositoryRecordType.CREATE || record.getType() == RepositoryRecordType.SWAP_IN) {
- inMemoryDelta++;
- } else if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.SWAP_OUT) {
- inMemoryDelta--;
- }
- }
- return inMemoryDelta;
- }
-
- /**
- * Check that a record has the required elements
- *
- * @param record to be validated
- */
- private void validateRecord(RepositoryRecord record) {
- if (record.getType() != RepositoryRecordType.DELETE
- && record.getType() != RepositoryRecordType.CONTENTMISSING
- && record.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS
- && record.getType() != RepositoryRecordType.SWAP_OUT
- && record.getDestination() == null) {
- throw new IllegalArgumentException("Record " + record + " has no destination and Type is " + record.getType());
- }
- }
-
- /**
- * Update the records in the RocksDB database
- *
- * @param records to be persisted
- * @return the value of the sync counter immediately after the last update which requires a sync
- */
- private int updateRocksDB(Collection records) throws IOException {
-
- // Partition records by UpdateType.
- // We do this because we want to ensure that records creating data are persisted first.
- // Additionally, remove records of type 'CLEANUP_TRANSIENT_CLAIMS' so they aren't sent to the db
-
- final Map> partitionedRecords = new HashMap<>();
- for (RepositoryRecord repositoryRecord : records) {
- if (repositoryRecord.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS) {
- continue;
- }
- final UpdateType updateType = serdeFactory.getUpdateType(new LiveSerializedRepositoryRecord(repositoryRecord));
- partitionedRecords.computeIfAbsent(updateType, ut -> new ArrayList<>()).add(repositoryRecord);
- }
-
- int counterValue;
-
- try {
- // handle CREATE records
- putAll(partitionedRecords.get(UpdateType.CREATE));
-
- // handle SWAP_OUT records
- List swapOutRecords = partitionedRecords.get(UpdateType.SWAP_OUT);
- if (swapOutRecords != null) {
- for (final RepositoryRecord record : swapOutRecords) {
- final SerializedRepositoryRecord serializedRecord = new LiveSerializedRepositoryRecord(record);
- final String newLocation = serdeFactory.getLocation(serializedRecord);
- final Long recordIdentifier = serdeFactory.getRecordIdentifier(serializedRecord);
-
- if (newLocation == null) {
- logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " +
- "no indicator of where the Record is to be Swapped Out to; these records may be " +
- "lost when the repository is restored!");
- } else {
- delete(recordIdentifier);
- }
- }
- }
-
- // handle SWAP_IN records
- List swapInRecords = partitionedRecords.get(UpdateType.SWAP_IN);
- if (swapInRecords != null) {
- for (final RepositoryRecord record : swapInRecords) {
- final SerializedRepositoryRecord serialized = new LiveSerializedRepositoryRecord(record);
-
- final String newLocation = serdeFactory.getLocation(serialized);
- if (newLocation == null) {
- final Long recordIdentifier = serdeFactory.getRecordIdentifier(serialized);
- logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but " +
- "no indicator of where the Record is to be Swapped In from; these records may be " +
- "duplicated when the repository is restored!");
- }
- put(record);
- }
- }
-
- // if a sync is required, get the current value of the sync counter
- counterValue = syncRequired(partitionedRecords) ? db.getSyncCounterValue() : -1;
-
- // handle UPDATE records
- putAll(partitionedRecords.get(UpdateType.UPDATE));
-
- // handle DELETE records
- deleteAll(partitionedRecords.get(UpdateType.DELETE));
-
- } catch (RocksDBException e) {
- throw new IOException(e);
- }
-
- return counterValue;
- }
-
- private boolean syncRequired(Map> recordMap) {
- for (UpdateType updateType : recordMap.keySet()) {
- if (updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_OUT || updateType == UpdateType.SWAP_IN) {
- return true;
- }
- }
- return false;
- }
-
- private void deleteAll(List repositoryRecords) throws RocksDBException {
- if (repositoryRecords != null) {
- for (final RepositoryRecord record : repositoryRecords) {
- final SerializedRepositoryRecord serialized = new LiveSerializedRepositoryRecord(record);
- final Long id = serdeFactory.getRecordIdentifier(serialized);
- delete(id);
- }
- }
- }
-
- private void delete(Long recordId) throws RocksDBException {
- byte[] key = RocksDBMetronome.getBytes(recordId);
- db.delete(key);
- }
-
- private void putAll(List repositoryRecords) throws IOException, RocksDBException {
- if (repositoryRecords != null) {
- for (final RepositoryRecord record : repositoryRecords) {
- put(record);
- }
- }
- }
-
- private void put(RepositoryRecord record) throws IOException, RocksDBException {
- final Long recordIdentifier = serdeFactory.getRecordIdentifier(new LiveSerializedRepositoryRecord(record));
- byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
- final byte[] serializedRecord = serialize(record);
- db.put(key, serializedRecord);
- }
-
- private byte[] serialize(RepositoryRecord record) throws IOException {
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
- serializer.serializeRecord(new LiveSerializedRepositoryRecord(record), dataOutputStream);
- return byteArrayOutputStream.toByteArray();
- }
- }
-
- private void determineDestructibleClaims(Collection records) throws IOException {
-
- final Set claimsToAdd = new HashSet<>();
- final Set swapLocationsAdded = new HashSet<>();
- final Set swapLocationsRemoved = new HashSet<>();
- for (final RepositoryRecord record : records) {
- updateClaimCounts(record);
-
- if (record.getType() == RepositoryRecordType.DELETE) {
- // For any DELETE record that we have, if claim is destructible, mark it so
- if (isDestructible(record.getCurrentClaim())) {
- claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
- }
-
- // If the original claim is different than the current claim and the original claim is destructible, mark it so
- if (shouldDestroyOriginal(record)) {
- claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
- }
- } else if (record.getType() == RepositoryRecordType.UPDATE) {
- // if we have an update, and the original is no longer needed, mark original as destructible
- if (shouldDestroyOriginal(record)) {
- claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
- }
- } else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
- final String swapLocation = record.getSwapLocation();
- final String normalizedSwapLocation = normalizeSwapLocation(swapLocation);
- swapLocationsAdded.add(normalizedSwapLocation);
- swapLocationsRemoved.remove(normalizedSwapLocation);
- } else if (record.getType() == RepositoryRecordType.SWAP_IN) {
- final String swapLocation = record.getSwapLocation();
- final String normalizedSwapLocation = normalizeSwapLocation(swapLocation);
- swapLocationsRemoved.add(normalizedSwapLocation);
- swapLocationsAdded.remove(normalizedSwapLocation);
- }
-
- final List transientClaims = record.getTransientClaims();
- if (transientClaims != null) {
- for (final ContentClaim transientClaim : transientClaims) {
- if (isDestructible(transientClaim)) {
- claimsToAdd.add(transientClaim.getResourceClaim());
- }
- }
- }
- }
-
- // If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes.
- if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
- synchronized (swapLocationSuffixes) {
- removeNormalizedSwapLocations(swapLocationsRemoved);
- addNormalizedSwapLocations(swapLocationsAdded);
- }
- }
-
- // add any new claims that can be destroyed to the list
- if (!claimsToAdd.isEmpty()) {
- synchronized (claimsAwaitingDestruction) {
- claimsAwaitingDestruction.get().addAll(claimsToAdd);
- }
- }
- }
-
-
- private void updateClaimCounts(final RepositoryRecord record) {
- final ContentClaim currentClaim = record.getCurrentClaim();
- final ContentClaim originalClaim = record.getOriginalClaim();
- final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
-
- if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
- decrementClaimCount(currentClaim);
- }
-
- if (claimChanged) {
- // records which have been updated - remove original if exists
- decrementClaimCount(originalClaim);
- }
- }
-
- private void decrementClaimCount(final ContentClaim claim) {
- if (claim == null) {
- return;
- }
-
- claimManager.decrementClaimantCount(claim.getResourceClaim());
- }
-
-
- /**
- * @param claim to be evaluated
- * @return true if the claim can be destroyed
- */
- private boolean isDestructible(final ContentClaim claim) {
- if (claim == null) {
- return false;
- }
-
- final ResourceClaim resourceClaim = claim.getResourceClaim();
- if (resourceClaim == null) {
- return false;
- }
-
- return !resourceClaim.isInUse();
- }
-
- /**
- * @param record to be evaluated
- * @return true if the original claim can be destroyed
- */
- private boolean shouldDestroyOriginal(RepositoryRecord record) {
- final ContentClaim originalClaim = record.getOriginalClaim();
- return isDestructible(originalClaim) && !originalClaim.equals(record.getCurrentClaim());
- }
-
- @Override
- public boolean isVolatile() {
- return false;
- }
-
- @Override
- public long getStorageCapacity() throws IOException {
- return db.getStorageCapacity();
- }
-
- @Override
- public long getUsableStorageSpace() throws IOException {
- return db.getUsableStorageSpace();
- }
-
-
- @Override
- public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
- String normalizedSwapLocation = normalizeSwapLocation(swapLocationSuffix);
- synchronized (swapLocationSuffixes) {
- return swapLocationSuffixes.contains(normalizedSwapLocation);
- }
- }
-
-
- static String normalizeSwapLocation(final String swapLocation) {
- if (swapLocation == null) {
- return null;
- }
-
- final String normalizedPath = swapLocation.replace("\\", "/");
- final String withoutTrailing = (normalizedPath.endsWith("/") && normalizedPath.length() > 1) ? normalizedPath.substring(0, normalizedPath.length() - 1) : normalizedPath;
- final String pathRemoved = getLocationSuffix(withoutTrailing);
-
- return StringUtils.substringBefore(pathRemoved, ".");
- }
-
- private static String getLocationSuffix(final String swapLocation) {
- final int lastIndex = swapLocation.lastIndexOf("/");
- if (lastIndex < 0 || lastIndex >= swapLocation.length() - 1) {
- return swapLocation;
- }
-
- return swapLocation.substring(lastIndex + 1);
- }
-
- @Override
- public void swapFlowFilesOut(final List swappedOut, final FlowFileQueue queue,
- final String swapLocation) throws IOException {
- final List repoRecords = new ArrayList<>();
- if (swappedOut == null || swappedOut.isEmpty()) {
- return;
- }
-
- for (final FlowFileRecord swapRecord : swappedOut) {
- final RepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord, swapLocation);
- repoRecords.add(repoRecord);
- }
-
- updateRepository(repoRecords);
- addRawSwapLocation(swapLocation);
- logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", swappedOut.size(), queue, swapLocation);
- }
-
- @Override
- public void swapFlowFilesIn(final String swapLocation, final List swapRecords,
- final FlowFileQueue queue) throws IOException {
- final List repoRecords = new ArrayList<>();
-
- for (final FlowFileRecord swapRecord : swapRecords) {
- final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord);
- repoRecord.setSwapLocation(swapLocation); // set the swap file to indicate that it's being swapped in.
- repoRecord.setDestination(queue);
-
- repoRecords.add(repoRecord);
- }
-
- updateRepository(repoRecords);
- removeRawSwapLocation(swapLocation);
- logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
- }
-
-
- @Override
- public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
-
- final long startTime = System.nanoTime();
-
- queueMap.clear();
- for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
- queueMap.put(queue.getIdentifier(), queue);
- }
-
- final ExecutorService recordDeserializationExecutor = Executors.newFixedThreadPool(deserializationThreads, r -> {
- Thread thread = Executors.defaultThreadFactory().newThread(r);
- thread.setDaemon(true);
- return thread;
- });
-
- // Create a queue which will hold the bytes of the records that have been read from disk and are awaiting deserialization
- final BlockingQueue recordBytesQueue = new ArrayBlockingQueue<>(deserializationBufferSize);
-
- final AtomicBoolean doneReading = new AtomicBoolean(false);
- final List> futures = new ArrayList<>(deserializationThreads);
-
- RepositoryRecordSerdeFactory factory = new StandardRepositoryRecordSerdeFactory(claimManager);
-
- final AtomicInteger numFlowFilesMissingQueue = new AtomicInteger(0);
- final AtomicInteger recordCount = new AtomicInteger(0);
- final AtomicInteger recoveryModeRecordCount = new AtomicInteger(0);
-
- for (int i = 0; i < deserializationThreads; i++) {
- futures.add(recordDeserializationExecutor.submit(() -> {
- long localMaxId = 0;
- int localRecordCount = 0;
- final Set localRecoveredSwapLocations = new HashSet<>();
-
- // Create deserializer in each thread
- final SerDe localDeserializer = factory.createSerDe(serializationEncodingName);
- try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializationHeader);
- DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
- localDeserializer.readHeader(dataInputStream);
- }
-
- while (!doneReading.get() || !recordBytesQueue.isEmpty()) {
- byte[] value = recordBytesQueue.poll(100, TimeUnit.MILLISECONDS);
- if (value != null) {
- try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(value);
- DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
- SerializedRepositoryRecord record = localDeserializer.deserializeRecord(dataInputStream, localDeserializer.getVersion());
-
- localRecordCount++;
-
- // increment the count for the record
- final ContentClaim claim = record.getContentClaim();
- if (claim != null) {
- claimManager.incrementClaimantCount(claim.getResourceClaim());
- }
-
- final long recordId = record.getFlowFileRecord().getId();
- if (recordId > localMaxId) {
- localMaxId = recordId;
- }
-
- if (record.getType().equals(RepositoryRecordType.SWAP_OUT)) {
- localRecoveredSwapLocations.add(normalizeSwapLocation(record.getSwapLocation()));
- }
-
- final FlowFileRecord flowFile = record.getFlowFileRecord();
- final FlowFileQueue queue = queueMap.get(record.getQueueIdentifier());
- if (queue == null) {
- if (!removeOrphanedFlowFiles) {
- throw new IOException("Found FlowFile in repository without a corresponding queue. " +
- "This may indicate an issue syncing the flow.xml in a cluster. " +
- "To resolve this issue you should restore the flow.xml. " +
- "Alternatively, if removing data is acceptable, you can add the following to nifi.properties: \n\n" +
- "\t\t" + RocksDbProperty.REMOVE_ORPHANED_FLOWFILES.propertyName + "=true\n\n" +
- "...once this has allowed you to restart nifi, you should remove it from nifi.properties to prevent inadvertent future data loss.");
- }
-
- numFlowFilesMissingQueue.incrementAndGet();
- try {
- final Long recordIdentifier = factory.getRecordIdentifier(record);
- byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
- db.delete(key);
- } catch (RocksDBException e) {
- logger.warn("Could not clean up repository", e);
- }
- } else {
- // verify we're supposed to enqueue the FlowFile
- if (!enableRecoveryMode) {
- queue.put(flowFile);
- } else if (recoveryModeRecordCount.incrementAndGet() <= recoveryModeFlowFileLimit) {
- queue.put(flowFile);
- } else {
- final Long recordIdentifier = factory.getRecordIdentifier(record);
- byte[] key = RocksDBMetronome.getBytes(recordIdentifier);
- recordsToRestore.add(key);
- }
- }
- }
- }
- }
- recordCount.addAndGet(localRecordCount);
- addNormalizedSwapLocations(localRecoveredSwapLocations);
- return localMaxId;
-
- }));
- }
-
- long maxId = 0;
- RocksIterator rocksIterator = db.getIterator();
- rocksIterator.seekToFirst();
- long counter = 0;
- long totalRecords = 0;
- try {
- while (rocksIterator.isValid()) {
- if (recordBytesQueue.offer(rocksIterator.value(), 10, TimeUnit.SECONDS)) {
- rocksIterator.next();
- if (++counter == 5_000) { // periodically report progress
- totalRecords += counter;
- counter = 0;
- logger.info("Read {} records from disk", totalRecords);
- }
- } else {
- // couldn't add to the queue in a timely fashion... make sure there are no exceptions from the consumers
- for (Future f : futures) {
- // the only way it could be done at this point is through an exception
- // (because we haven't yet set doneReading = true)
- if (f.isDone()) {
- f.get(); // this will throw the exception
- }
- }
- logger.warn("Failed to add record bytes to queue. Will keep trying...");
- }
- }
-
- doneReading.set(true);
- totalRecords += counter;
- logger.info("Finished reading from rocksDB. Read {} records from disk", totalRecords);
-
- for (Future f : futures) {
- long futureMax = f.get(); // will wait for completion (or exception)
- if (futureMax > maxId) {
- maxId = futureMax;
- }
- }
-
- logger.info("Finished deserializing {} records", recordCount.get());
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- } catch (ExecutionException e) {
- throw new IOException(e);
- } finally {
- recordDeserializationExecutor.shutdownNow();
- }
-
- // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
- // return the appropriate number.
- flowFileSequenceGenerator.set(maxId + 1);
-
- int flowFilesInQueues = recordCount.get() - numFlowFilesMissingQueue.get();
- inMemoryFlowFiles.set(!enableRecoveryMode ? flowFilesInQueues
- : Math.min(flowFilesInQueues, recoveryModeFlowFileLimit));
- logger.info("Successfully restored {} FlowFiles in {} milliseconds using {} threads",
- getInMemoryFlowFiles(),
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime),
- deserializationThreads);
- if (logger.isDebugEnabled()) {
- synchronized (this.swapLocationSuffixes) {
- logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes);
- }
- }
- if (numFlowFilesMissingQueue.get() > 0) {
- logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles have been dropped.", numFlowFilesMissingQueue);
- }
-
- final SerDe deserializer = factory.createSerDe(serializationEncodingName);
-
- try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializationHeader);
- DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
- deserializer.readHeader(dataInputStream);
- }
-
- if (enableRecoveryMode) {
- recordDeserializer.set(deserializer);
- }
-
- return maxId;
- }
-
- @Override
- public Set findQueuesWithFlowFiles(final FlowFileSwapManager flowFileSwapManager) throws IOException {
- return null;
- }
-
- private void addRawSwapLocation(String rawSwapLocation) throws IOException {
- addRawSwapLocations(Collections.singleton(rawSwapLocation));
- }
-
- private void addRawSwapLocations(Collection rawSwapLocations) throws IOException {
- addNormalizedSwapLocations(rawSwapLocations.stream().map(RocksDBFlowFileRepository::normalizeSwapLocation).collect(Collectors.toSet()));
- }
-
- private void addNormalizedSwapLocations(Collection normalizedSwapLocations) throws IOException {
- synchronized (this.swapLocationSuffixes) {
- this.swapLocationSuffixes.addAll(normalizedSwapLocations);
- persistSwapLocationSuffixes();
- }
- }
-
- private void removeRawSwapLocation(String rawSwapLocation) throws IOException {
- removeRawSwapLocations(Collections.singleton(rawSwapLocation));
- }
-
- private void removeRawSwapLocations(Collection rawSwapLocations) throws IOException {
- removeNormalizedSwapLocations(rawSwapLocations.stream().map(RocksDBFlowFileRepository::normalizeSwapLocation).collect(Collectors.toSet()));
- }
-
- private void removeNormalizedSwapLocations(Collection normalizedSwapLocations) throws IOException {
- synchronized (this.swapLocationSuffixes) {
- this.swapLocationSuffixes.removeAll(normalizedSwapLocations);
- persistSwapLocationSuffixes();
- }
- }
-
- private void persistSwapLocationSuffixes() throws IOException {
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
- ) {
- objectOutputStream.writeObject(swapLocationSuffixes);
- db.putConfiguration(SWAP_LOCATION_SUFFIX_KEY, byteArrayOutputStream.toByteArray());
- } catch (RocksDBException e) {
- throw new IOException(e);
- }
- }
-
-
- @Override
- public void updateMaxFlowFileIdentifier(final long maxId) {
- while (true) {
- final long currentId = flowFileSequenceGenerator.get();
- if (currentId >= maxId) {
- return;
- }
-
- final boolean updated = flowFileSequenceGenerator.compareAndSet(currentId, maxId);
- if (updated) {
- return;
- }
- }
- }
-
- @Override
- public long getNextFlowFileSequence() {
- return flowFileSequenceGenerator.getAndIncrement();
- }
-
- @Override
- public long getMaxFlowFileIdentifier() {
- // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong
- return flowFileSequenceGenerator.get() - 1;
- }
-}
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBMetronome.java b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBMetronome.java
deleted file mode 100644
index 2c992eb67e..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBMetronome.java
+++ /dev/null
@@ -1,891 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.util.StringUtils;
-import org.rocksdb.AccessHint;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompressionType;
-import org.rocksdb.DBOptions;
-import org.rocksdb.InfoLogLevel;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
-/**
- * This RocksDB helper class, instead of forcing to disk every time it's given a record,
- * persists all waiting records on a regular interval (using a ScheduledExecutorService).
- * Like when a metronome ticks.
- */
-
-class RocksDBMetronome implements Closeable {
-
- private static final Logger logger = LoggerFactory.getLogger(RocksDBMetronome.class);
-
- static final String CONFIGURATION_FAMILY = "configuration.column.family";
- static final String DEFAULT_FAMILY = "default";
-
- private final AtomicLong lastSyncWarningNanos = new AtomicLong(0L);
- private final int parallelThreads;
- private final int maxWriteBufferNumber;
- private final int minWriteBufferNumberToMerge;
- private final long writeBufferSize;
- private final long maxTotalWalSize;
- private final long delayedWriteRate;
- private final int level0SlowdownWritesTrigger;
- private final int level0StopWritesTrigger;
- private final int maxBackgroundFlushes;
- private final int maxBackgroundCompactions;
- private final int statDumpSeconds;
- private final long syncMillis;
- private final long syncWarningNanos;
- private final Path storagePath;
- private final boolean adviseRandomOnOpen;
- private final boolean createIfMissing;
- private final boolean createMissingColumnFamilies;
- private final boolean useFsync;
-
- private final Set columnFamilyNames;
- private final Map columnFamilyHandles;
-
- private final boolean periodicSyncEnabled;
- private final ScheduledExecutorService syncExecutor;
- private final ReentrantLock syncLock = new ReentrantLock();
- private final Condition syncCondition = syncLock.newCondition();
- private final AtomicInteger syncCounter = new AtomicInteger(0);
-
- private volatile RocksDB rocksDB = null;
- private final ReentrantReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock.ReadLock dbReadLock = dbReadWriteLock.readLock();
- private final ReentrantReadWriteLock.WriteLock dbWriteLock = dbReadWriteLock.writeLock();
- private volatile boolean closed = false;
-
- private ColumnFamilyHandle configurationColumnFamilyHandle;
- private ColumnFamilyHandle defaultColumnFamilyHandle;
- private WriteOptions forceSyncWriteOptions;
- private WriteOptions noSyncWriteOptions;
-
- private RocksDBMetronome(Builder builder) {
- statDumpSeconds = builder.statDumpSeconds;
- parallelThreads = builder.parallelThreads;
- maxWriteBufferNumber = builder.maxWriteBufferNumber;
- minWriteBufferNumberToMerge = builder.minWriteBufferNumberToMerge;
- writeBufferSize = builder.writeBufferSize;
- maxTotalWalSize = builder.getMaxTotalWalSize();
- delayedWriteRate = builder.delayedWriteRate;
- level0SlowdownWritesTrigger = builder.level0SlowdownWritesTrigger;
- level0StopWritesTrigger = builder.level0StopWritesTrigger;
- maxBackgroundFlushes = builder.maxBackgroundFlushes;
- maxBackgroundCompactions = builder.maxBackgroundCompactions;
- syncMillis = builder.syncMillis;
- syncWarningNanos = builder.syncWarningNanos;
- storagePath = builder.storagePath;
- adviseRandomOnOpen = builder.adviseRandomOnOpen;
- createIfMissing = builder.createIfMissing;
- createMissingColumnFamilies = builder.createMissingColumnFamilies;
- useFsync = builder.useFsync;
- columnFamilyNames = builder.columnFamilyNames;
- columnFamilyHandles = new HashMap<>(columnFamilyNames.size());
-
- periodicSyncEnabled = builder.periodicSyncEnabled;
- syncExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
- Thread thread = Executors.defaultThreadFactory().newThread(r);
- thread.setDaemon(true);
- return thread;
- });
- }
-
- /**
- * Initialize the metronome
- *
- * @throws IOException if there is an issue with the underlying database
- */
- public void initialize() throws IOException {
-
- final String rocksSharedLibDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
- final String javaTmpDir = System.getProperty("java.io.tmpdir");
- final String libDir = !StringUtils.isBlank(rocksSharedLibDir) ? rocksSharedLibDir : javaTmpDir;
- try {
- Files.createDirectories(Paths.get(libDir));
- } catch (IOException e) {
- throw new IOException("Unable to load the RocksDB shared library into directory: " + libDir, e);
- }
-
- // delete any previous librocksdbjni.so files
- final File[] rocksSos = Paths.get(libDir).toFile().listFiles((dir, name) -> name.startsWith("librocksdbjni") && name.endsWith(".so"));
- if (rocksSos != null) {
- for (File rocksSo : rocksSos) {
- if (!rocksSo.delete()) {
- logger.warn("Could not delete existing librocksdbjni*.so file {}", rocksSo);
- }
- }
- }
-
- try {
- RocksDB.loadLibrary();
- } catch (Throwable t) {
- if (System.getProperty("os.name").startsWith("Windows")) {
- logger.error("The RocksDBMetronome will only work on Windows if you have Visual C++ runtime libraries for Visual Studio 2015 installed. " +
- "If the DLLs required to support RocksDB cannot be found, then NiFi will not start!");
- }
- throw t;
- }
-
- Files.createDirectories(storagePath);
-
- forceSyncWriteOptions = new WriteOptions()
- .setDisableWAL(false)
- .setSync(true);
-
- noSyncWriteOptions = new WriteOptions()
- .setDisableWAL(false)
- .setSync(false);
-
-
- dbWriteLock.lock();
- try (final DBOptions dbOptions = new DBOptions()
- .setAccessHintOnCompactionStart(AccessHint.SEQUENTIAL)
- .setAdviseRandomOnOpen(adviseRandomOnOpen)
- .setAllowMmapWrites(false) // required to be false for RocksDB.syncWal() to work
- .setCreateIfMissing(createIfMissing)
- .setCreateMissingColumnFamilies(createMissingColumnFamilies)
- .setDelayedWriteRate(delayedWriteRate)
- .setIncreaseParallelism(parallelThreads)
- .setLogger(getRocksLogger())
- .setMaxBackgroundCompactions(maxBackgroundCompactions)
- .setMaxBackgroundFlushes(maxBackgroundFlushes)
- .setMaxTotalWalSize(maxTotalWalSize)
- .setStatsDumpPeriodSec(statDumpSeconds)
- .setUseFsync(useFsync)
- ;
-
- final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()
- .setCompressionType(CompressionType.LZ4_COMPRESSION)
- .setLevel0SlowdownWritesTrigger(level0SlowdownWritesTrigger)
- .setLevel0StopWritesTrigger(level0StopWritesTrigger)
- .setMaxWriteBufferNumber(maxWriteBufferNumber)
- .setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge)
- .setWriteBufferSize(writeBufferSize)
- ) {
-
- // create family descriptors
- List familyDescriptors = new ArrayList<>(columnFamilyNames.size());
- for (byte[] name : columnFamilyNames) {
- familyDescriptors.add(new ColumnFamilyDescriptor(name, cfOptions));
- }
-
- List columnFamilyList = new ArrayList<>(columnFamilyNames.size());
-
- rocksDB = RocksDB.open(dbOptions, storagePath.toString(), familyDescriptors, columnFamilyList);
-
- // create the map of names to handles
- columnFamilyHandles.put(DEFAULT_FAMILY, rocksDB.getDefaultColumnFamily());
- for (ColumnFamilyHandle cf : columnFamilyList) {
- columnFamilyHandles.put(new String(cf.getName(), StandardCharsets.UTF_8), cf);
- }
-
- // set specific special handles
- defaultColumnFamilyHandle = rocksDB.getDefaultColumnFamily();
- configurationColumnFamilyHandle = columnFamilyHandles.get(CONFIGURATION_FAMILY);
-
- } catch (RocksDBException e) {
- throw new IOException(e);
- } finally {
- dbWriteLock.unlock();
- }
-
- if (periodicSyncEnabled) {
- syncExecutor.scheduleWithFixedDelay(this::doSync, syncMillis, syncMillis, TimeUnit.MILLISECONDS);
- }
-
- logger.info("Initialized RocksDB Repository at {}", storagePath);
- }
-
-
- /**
- * This method checks the state of the database to ensure it is available for use.
- *
- * NOTE: This *must* be called holding the dbReadLock
- *
- * @throws IllegalStateException if the database is closed or not yet initialized
- */
- private void checkDbState() throws IllegalStateException {
- if (rocksDB == null) {
- if (closed) {
- throw new IllegalStateException("RocksDBMetronome is closed");
- }
- throw new IllegalStateException("RocksDBMetronome has not been initialized");
- }
-
- }
-
- /**
- * Return an iterator over the specified column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
- *
- * Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
- *
- * @param columnFamilyHandle specifies the column family for the iterator
- * @return an iterator over the specified column family
- */
- public RocksIterator getIterator(final ColumnFamilyHandle columnFamilyHandle) {
- dbReadLock.lock();
- try {
- checkDbState();
- return rocksDB.newIterator(columnFamilyHandle);
- } finally {
- dbReadLock.unlock();
- }
- }
-
- /**
- * Get the value for the provided key in the specified column family
- *
- * @param columnFamilyHandle the column family from which to get the value
- * @param key the key of the value to retrieve
- * @return the value for the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public byte[] get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException {
- dbReadLock.lock();
- try {
- checkDbState();
- return rocksDB.get(columnFamilyHandle, key);
- } finally {
- dbReadLock.unlock();
- }
- }
-
- /**
- * Put the key / value pair into the database in the specified column family
- *
- * @param columnFamilyHandle the column family in to which to put the value
- * @param writeOptions specification of options for write operations
- * @param key the key to be inserted
- * @param value the value to be associated with the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void put(final ColumnFamilyHandle columnFamilyHandle, WriteOptions writeOptions, final byte[] key, final byte[] value) throws RocksDBException {
- dbReadLock.lock();
- try {
- checkDbState();
- rocksDB.put(columnFamilyHandle, writeOptions, key, value);
- } finally {
- dbReadLock.unlock();
- }
- }
-
- /**
- * Delete the key / value pair from the specified column family
- *
- * @param columnFamilyHandle the column family in to which to put the value
- * @param writeOptions specification of options for write operations
- * @param key the key to be inserted
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final WriteOptions writeOptions) throws RocksDBException {
- dbReadLock.lock();
- try {
- checkDbState();
- rocksDB.delete(columnFamilyHandle, writeOptions, key);
- } finally {
- dbReadLock.unlock();
- }
- }
-
- /**
- * Flushes the WAL and syncs to disk
- *
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void forceSync() throws RocksDBException {
- dbReadLock.lock();
- try {
- checkDbState();
- rocksDB.syncWal();
- } finally {
- dbReadLock.unlock();
- }
- }
-
- /**
- * Get the handle for the specified column family
- *
- * @param familyName the name of the column family
- * @return the handle
- */
- public ColumnFamilyHandle getColumnFamilyHandle(String familyName) {
- return columnFamilyHandles.get(familyName);
- }
-
- /**
- * Put the key / value pair into the configuration column family and sync the wal
- *
- * @param key the key to be inserted
- * @param value the value to be associated with the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void putConfiguration(final byte[] key, final byte[] value) throws RocksDBException {
- put(configurationColumnFamilyHandle, forceSyncWriteOptions, key, value);
- }
-
- /**
- * Put the key / value pair into the database in the specified column family without syncing the wal
- *
- * @param columnFamilyHandle the column family in to which to put the value
- * @param key the key to be inserted
- * @param value the value to be associated with the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException {
- put(columnFamilyHandle, noSyncWriteOptions, key, value);
- }
-
- /**
- * Put the key / value pair into the database in the specified column family, optionally syncing the wal
- *
- * @param columnFamilyHandle the column family in to which to put the value
- * @param key the key to be inserted
- * @param value the value to be associated with the specified key
- * @param forceSync if true, sync the wal
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
- put(columnFamilyHandle, getWriteOptions(forceSync), key, value);
- }
-
- /**
- * Put the key / value pair into the database in the default column family, optionally syncing the wal
- *
- * @param key the key to be inserted
- * @param value the value to be associated with the specified key
- * @param forceSync if true, sync the wal
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void put(final byte[] key, final byte[] value, final boolean forceSync) throws RocksDBException {
- put(defaultColumnFamilyHandle, getWriteOptions(forceSync), key, value);
- }
-
- /**
- * Put the key / value pair into the database in the default column family, without syncing the wal
- *
- * @param key the key to be inserted
- * @param value the value to be associated with the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void put(final byte[] key, final byte[] value) throws RocksDBException {
- put(defaultColumnFamilyHandle, noSyncWriteOptions, key, value);
- }
-
- /**
- * Get the value for the provided key in the default column family
- *
- * @param key the key of the value to retrieve
- * @return the value for the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public byte[] get(final byte[] key) throws RocksDBException {
- return get(defaultColumnFamilyHandle, key);
- }
-
- /**
- * Get the value for the provided key in the configuration column family
- *
- * @param key the key of the value to retrieve
- * @return the value for the specified key
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public byte[] getConfiguration(final byte[] key) throws RocksDBException {
- return get(configurationColumnFamilyHandle, key);
- }
-
-
- /**
- * Delete the key / value pair from the default column family without syncing the wal
- *
- * @param key the key to be inserted
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void delete(byte[] key) throws RocksDBException {
- delete(defaultColumnFamilyHandle, key, noSyncWriteOptions);
- }
-
- /**
- * Delete the key / value pair from the default column family, optionally syncing the wal
- *
- * @param key the key to be inserted
- * @param forceSync if true, sync the wal
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void delete(byte[] key, final boolean forceSync) throws RocksDBException {
- delete(defaultColumnFamilyHandle, key, getWriteOptions(forceSync));
- }
-
- /**
- * Delete the key / value pair from the default column family without syncing the wal
- *
- * @param columnFamilyHandle the column family in to which to put the value
- * @param key the key to be inserted
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void delete(final ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
- delete(columnFamilyHandle, key, noSyncWriteOptions);
- }
-
- /**
- * Delete the key / value pair from the specified column family, optionally syncing the wal
- *
- * @param columnFamilyHandle the column family in to which to put the value
- * @param key the key to be inserted
- * @param forceSync if true, sync the wal
- * @throws RocksDBException thrown if there is an error in the underlying library.
- */
- public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean forceSync) throws RocksDBException {
- delete(columnFamilyHandle, key, getWriteOptions(forceSync));
- }
-
- private WriteOptions getWriteOptions(boolean forceSync) {
- return forceSync ? forceSyncWriteOptions : noSyncWriteOptions;
- }
-
- /**
- * Return an iterator over the default column family. The iterator is initially invalid (caller must call one of the Seek methods on the iterator before using it).
- *
- * Caller should close the iterator when it is no longer needed. The returned iterator should be closed before this db is closed.
- *
- * @return an iterator over the default column family
- */
- public RocksIterator getIterator() {
- return getIterator(defaultColumnFamilyHandle);
- }
-
- /**
- * Get the current value of the sync counter. This can be used with waitForSync() to verify that operations have been synced to disk
- *
- * @return the current value of the sync counter
- */
- public int getSyncCounterValue() {
- return syncCounter.get();
- }
-
- /**
- * Close the Metronome and the RocksDB Objects
- *
- * @throws IOException if there are issues in closing any of the underlying RocksDB objects
- */
- @Override
- public void close() throws IOException {
-
- logger.info("Closing RocksDBMetronome");
-
- dbWriteLock.lock();
- try {
- logger.info("Shutting down RocksDBMetronome sync executor");
- syncExecutor.shutdownNow();
-
- try {
- logger.info("Pausing RocksDB background work");
- rocksDB.pauseBackgroundWork();
- } catch (RocksDBException e) {
- logger.warn("Unable to pause background work before close.", e);
- }
-
- final AtomicReference exceptionReference = new AtomicReference<>();
-
- logger.info("Closing RocksDB configurations");
-
- safeClose(forceSyncWriteOptions, exceptionReference);
- safeClose(noSyncWriteOptions, exceptionReference);
-
- // close the column family handles first, then the db last
- for (ColumnFamilyHandle cfh : columnFamilyHandles.values()) {
- safeClose(cfh, exceptionReference);
- }
-
- logger.info("Closing RocksDB database");
- safeClose(rocksDB, exceptionReference);
- rocksDB = null;
- closed = true;
-
- if (exceptionReference.get() != null) {
- throw new IOException(exceptionReference.get());
- }
- } finally {
- dbWriteLock.unlock();
- }
- }
-
- /**
- * @param autoCloseable An {@link AutoCloseable} to be closed
- * @param exceptionReference A reference to contain any encountered {@link Exception}
- */
- private void safeClose(final AutoCloseable autoCloseable, final AtomicReference exceptionReference) {
- if (autoCloseable != null) {
- try {
- autoCloseable.close();
- } catch (Exception e) {
- exceptionReference.set(e);
- }
- }
- }
-
- /**
- * @return The capacity of the store
- * @throws IOException if encountered
- */
- public long getStorageCapacity() throws IOException {
- return Files.getFileStore(storagePath).getTotalSpace();
- }
-
- /**
- * @return The usable space of the store
- * @throws IOException if encountered
- */
- public long getUsableStorageSpace() throws IOException {
- return Files.getFileStore(storagePath).getUsableSpace();
- }
-
- /**
- * This method is scheduled by the syncExecutor. It runs at the specified interval, forcing the RocksDB WAL to disk.
- *
- * If the sync is successful, it notifies threads that had been waiting for their records to be persisted using
- * syncCondition and the syncCounter, which is incremented to indicate success
- */
- void doSync() {
- syncLock.lock();
- try {
- // if we're interrupted, return
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- forceSync();
- syncCounter.incrementAndGet(); // its ok if it rolls over... we're just going to check that the value changed
- syncCondition.signalAll();
- } catch (final IllegalArgumentException e) {
- logger.error("Unable to sync, likely because the repository is out of space.", e);
- } catch (final Throwable t) {
- logger.error("Unable to sync", t);
- } finally {
- syncLock.unlock();
- }
- }
-
- /**
- * This method blocks until the next time the WAL is forced to disk, ensuring that all records written before this point have been persisted.
- */
- public void waitForSync() throws InterruptedException {
- final int counterValue = syncCounter.get();
- waitForSync(counterValue);
- }
-
- /**
- * This method blocks until the WAL has been forced to disk, ensuring that all records written before the point specified by the counterValue have been persisted.
- *
- * @param counterValue The value of the counter at the time of a write we must persist
- * @throws InterruptedException if the thread is interrupted
- */
- public void waitForSync(final int counterValue) throws InterruptedException {
- if (counterValue != syncCounter.get()) {
- return; // if the counter has already changed, we don't need to wait (or grab the lock) because the records we're concerned with have already been persisted
- }
- long waitTimeRemaining = syncWarningNanos;
- syncLock.lock();
- try {
- while (counterValue == syncCounter.get()) { // wait until the counter changes (indicating sync occurred)
- waitTimeRemaining = syncCondition.awaitNanos(waitTimeRemaining);
- if (waitTimeRemaining <= 0L) { // this means the wait timed out
-
- // only log a warning every syncWarningNanos... don't spam the logs
- final long now = System.nanoTime();
- final long lastWarning = lastSyncWarningNanos.get();
- if (now - lastWarning > syncWarningNanos
- && lastSyncWarningNanos.compareAndSet(lastWarning, now)) {
- logger.warn("Failed to sync within {} seconds... system configuration may need to be adjusted", TimeUnit.NANOSECONDS.toSeconds(syncWarningNanos));
- }
-
- // reset waiting time
- waitTimeRemaining = syncWarningNanos;
- }
- }
- } finally {
- syncLock.unlock();
- }
- }
-
- /**
- * Returns a representation of a long as a byte array
- *
- * @param value a long to convert
- * @return a byte[] representation
- */
- public static byte[] getBytes(long value) {
- byte[] bytes = new byte[8];
- writeLong(value, bytes);
- return bytes;
- }
-
- /**
- * Writes a representation of a long to the specified byte array
- *
- * @param l a long to convert
- * @param bytes an array to store the byte representation
- */
- public static void writeLong(long l, byte[] bytes) {
- bytes[0] = (byte) (l >>> 56);
- bytes[1] = (byte) (l >>> 48);
- bytes[2] = (byte) (l >>> 40);
- bytes[3] = (byte) (l >>> 32);
- bytes[4] = (byte) (l >>> 24);
- bytes[5] = (byte) (l >>> 16);
- bytes[6] = (byte) (l >>> 8);
- bytes[7] = (byte) (l);
- }
-
- /**
- * Creates a long from it's byte array representation
- * @param bytes to convert to a long
- * @return a long
- * @throws IOException if the given byte array is of the wrong size
- */
- public static long readLong(final byte[] bytes) throws IOException {
- if (bytes.length != 8) {
- throw new IOException("wrong number of bytes to convert to long (must be 8)");
- }
- return (((long) (bytes[0]) << 56) +
- ((long) (bytes[1] & 255) << 48) +
- ((long) (bytes[2] & 255) << 40) +
- ((long) (bytes[3] & 255) << 32) +
- ((long) (bytes[4] & 255) << 24) +
- ((long) (bytes[5] & 255) << 16) +
- ((long) (bytes[6] & 255) << 8) +
- ((long) (bytes[7] & 255)));
- }
-
- /**
- * @return the storage path of the db
- */
- public Path getStoragePath() {
- return storagePath;
- }
-
- /**
- * @return A RocksDB logger capturing all logging output from RocksDB
- */
- private org.rocksdb.Logger getRocksLogger() {
- try (Options options = new Options()
- // make RocksDB give us everything, and we'll decide what we want to log in our wrapper
- .setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) {
- return new LogWrapper(options);
- }
- }
-
- /**
- * An Extension of org.rocksdb.Logger that wraps the slf4j Logger
- */
- private class LogWrapper extends org.rocksdb.Logger {
-
- LogWrapper(Options options) {
- super(options);
- }
-
- @Override
- protected void log(final InfoLogLevel infoLogLevel, final String logMsg) {
- switch (infoLogLevel) {
- case ERROR_LEVEL:
- case FATAL_LEVEL:
- logger.error(logMsg);
- break;
- case WARN_LEVEL:
- logger.warn(logMsg);
- break;
- case INFO_LEVEL:
- case DEBUG_LEVEL:
- logger.debug(logMsg);
- break;
- case HEADER_LEVEL:
- default:
- logger.info(logMsg);
- break;
- }
- }
- }
-
- public static class Builder {
-
- int parallelThreads = 8;
- int maxWriteBufferNumber = 4;
- int minWriteBufferNumberToMerge = 1;
- long writeBufferSize = (long) DataUnit.MB.toB(256);
- long delayedWriteRate = (long) DataUnit.MB.toB(16);
- int level0SlowdownWritesTrigger = 20;
- int level0StopWritesTrigger = 40;
- int maxBackgroundFlushes = 1;
- int maxBackgroundCompactions = 1;
- int statDumpSeconds = 600;
- long syncMillis = 10;
- long syncWarningNanos = TimeUnit.SECONDS.toNanos(30);
- Path storagePath;
- boolean adviseRandomOnOpen = false;
- boolean createIfMissing = true;
- boolean createMissingColumnFamilies = true;
- boolean useFsync = true;
- boolean periodicSyncEnabled = true;
- final Set columnFamilyNames = new HashSet<>();
-
- public RocksDBMetronome build() {
- if (storagePath == null) {
- throw new IllegalStateException("Cannot create RocksDBMetronome because storagePath is not set");
- }
-
- // add default column families
- columnFamilyNames.add(RocksDB.DEFAULT_COLUMN_FAMILY);
- columnFamilyNames.add(CONFIGURATION_FAMILY.getBytes(StandardCharsets.UTF_8));
-
- return new RocksDBMetronome(this);
- }
-
- public Builder addColumnFamily(String name) {
- this.columnFamilyNames.add(name.getBytes(StandardCharsets.UTF_8));
- return this;
- }
-
- public Builder setStoragePath(Path storagePath) {
- this.storagePath = storagePath;
- return this;
- }
-
- public Builder setParallelThreads(int parallelThreads) {
- this.parallelThreads = parallelThreads;
- return this;
- }
-
- public Builder setMaxWriteBufferNumber(int maxWriteBufferNumber) {
- this.maxWriteBufferNumber = maxWriteBufferNumber;
- return this;
- }
-
- public Builder setMinWriteBufferNumberToMerge(int minWriteBufferNumberToMerge) {
- this.minWriteBufferNumberToMerge = minWriteBufferNumberToMerge;
- return this;
- }
-
- public Builder setWriteBufferSize(long writeBufferSize) {
- this.writeBufferSize = writeBufferSize;
- return this;
- }
-
- public Builder setDelayedWriteRate(long delayedWriteRate) {
- this.delayedWriteRate = delayedWriteRate;
- return this;
- }
-
- public Builder setLevel0SlowdownWritesTrigger(int level0SlowdownWritesTrigger) {
- this.level0SlowdownWritesTrigger = level0SlowdownWritesTrigger;
- return this;
- }
-
- public Builder setLevel0StopWritesTrigger(int level0StopWritesTrigger) {
- this.level0StopWritesTrigger = level0StopWritesTrigger;
- return this;
- }
-
- public Builder setMaxBackgroundFlushes(int maxBackgroundFlushes) {
- this.maxBackgroundFlushes = maxBackgroundFlushes;
- return this;
- }
-
- public Builder setMaxBackgroundCompactions(int maxBackgroundCompactions) {
- this.maxBackgroundCompactions = maxBackgroundCompactions;
- return this;
- }
-
- public Builder setStatDumpSeconds(int statDumpSeconds) {
- this.statDumpSeconds = statDumpSeconds;
- return this;
- }
-
- public Builder setSyncMillis(long syncMillis) {
- this.syncMillis = syncMillis;
- return this;
- }
-
- public Builder setSyncWarningNanos(long syncWarningNanos) {
- this.syncWarningNanos = syncWarningNanos;
- return this;
- }
-
-
- public Builder setAdviseRandomOnOpen(boolean adviseRandomOnOpen) {
- this.adviseRandomOnOpen = adviseRandomOnOpen;
- return this;
- }
-
- public Builder setCreateMissingColumnFamilies(boolean createMissingColumnFamilies) {
- this.createMissingColumnFamilies = createMissingColumnFamilies;
- return this;
- }
-
- public Builder setCreateIfMissing(boolean createIfMissing) {
- this.createIfMissing = createIfMissing;
- return this;
- }
-
- public Builder setUseFsync(boolean useFsync) {
- this.useFsync = useFsync;
- return this;
- }
-
- public Builder setPeriodicSyncEnabled(boolean periodicSyncEnabled) {
- this.periodicSyncEnabled = periodicSyncEnabled;
- return this;
- }
-
- long getMaxTotalWalSize() {
- return writeBufferSize * maxWriteBufferNumber;
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository
deleted file mode 100644
index b5cf7b599a..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileRepository
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.controller.repository.RocksDBFlowFileRepository
-
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
deleted file mode 100644
index af32f0371f..0000000000
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.NopConnectionEventListener;
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.queue.StandardFlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.controller.swap.StandardSwapContents;
-import org.apache.nifi.controller.swap.StandardSwapSummary;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.when;
-
-@DisabledOnOs(OS.WINDOWS)
-public class TestRocksDBFlowFileRepository {
-
- private static final Logger logger = LoggerFactory.getLogger(TestRocksDBFlowFileRepository.class);
-
- private final Map additionalProperties = new HashMap<>();
- private String nifiPropertiesPath;
-
- @TempDir
- public Path temporaryFolder;
-
- @BeforeEach
- public void before(TestInfo testInfo) throws IOException {
- additionalProperties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, temporaryFolder.toString());
- Path properties = Files.createFile(temporaryFolder.resolve(testInfo.getDisplayName() + ".properties"));
- Files.copy(Paths.get("src/test/resources/conf/nifi.properties"), properties, StandardCopyOption.REPLACE_EXISTING);
- nifiPropertiesPath = properties.toString();
-
- logger.info("Running test: {}", testInfo.getDisplayName());
- }
-
- @Test
- public void testNormalizeSwapLocation() {
- assertEquals("/", RocksDBFlowFileRepository.normalizeSwapLocation("/"));
- assertEquals("", RocksDBFlowFileRepository.normalizeSwapLocation(""));
- assertNull(RocksDBFlowFileRepository.normalizeSwapLocation(null));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt"));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/test.txt"));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/tmp/test.txt"));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("//test.txt"));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt"));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("test.txt/"));
- assertEquals("test", RocksDBFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"));
- assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation(WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/")));
- }
-
- @Test
- public void testSwapLocationsRestored() throws IOException {
- final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
- repo.initialize(new StandardResourceClaimManager());
-
- final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
- repo.loadFlowFiles(queueProvider);
-
- final Connection connection = Mockito.mock(Connection.class);
- when(connection.getIdentifier()).thenReturn("1234");
-
- final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
- when(queue.getIdentifier()).thenReturn("1234");
- when(connection.getFlowFileQueue()).thenReturn(queue);
-
- queueProvider.addConnection(connection);
-
- StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- ffBuilder.id(1L);
- ffBuilder.size(0L);
- final FlowFileRecord flowFileRecord = ffBuilder.build();
-
- final List records = new ArrayList<>();
- final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123");
- record.setDestination(queue);
- records.add(record);
-
- repo.updateRepository(records);
- repo.close();
-
- // restore
- final RocksDBFlowFileRepository repo2 = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
- repo2.initialize(new StandardResourceClaimManager());
- repo2.loadFlowFiles(queueProvider);
- assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
- assertFalse(repo2.isValidSwapLocationSuffix("other"));
- repo2.close();
- }
-
- @Test
- public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
- final Path path = Paths.get("target/test-swap-repo");
- if (Files.exists(path)) {
- FileUtils.deleteFile(path.toFile(), true);
- }
-
- final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
- repo.initialize(new StandardResourceClaimManager());
-
- final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
- repo.loadFlowFiles(queueProvider);
-
- final Connection connection = Mockito.mock(Connection.class);
- when(connection.getIdentifier()).thenReturn("1234");
-
- final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
- when(queue.getIdentifier()).thenReturn("1234");
- when(connection.getFlowFileQueue()).thenReturn(queue);
-
- queueProvider.addConnection(connection);
-
- StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- ffBuilder.id(1L);
- ffBuilder.size(0L);
- final FlowFileRecord flowFileRecord = ffBuilder.build();
-
- final List records = new ArrayList<>();
- final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
- record.setDestination(queue);
- records.add(record);
-
- assertFalse(repo.isValidSwapLocationSuffix("swap123"));
- repo.updateRepository(records);
- assertTrue(repo.isValidSwapLocationSuffix("swap123"));
- repo.close();
- }
-
- @Test
- public void testResourceClaimsIncremented() throws IOException {
- final ResourceClaimManager claimManager = new StandardResourceClaimManager();
-
- final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
- final Connection connection = Mockito.mock(Connection.class);
- when(connection.getIdentifier()).thenReturn("1234");
- when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
-
- final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
- final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
-
- when(connection.getFlowFileQueue()).thenReturn(queue);
- queueProvider.addConnection(connection);
-
- final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
- final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
-
- final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
- final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
-
- // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
- // indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
- // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
- try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
- repo.initialize(claimManager);
- repo.loadFlowFiles(queueProvider);
-
- // Create a Repository Record that indicates that a FlowFile was created
- final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
- .contentClaim(claim1)
- .build();
- final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
- rec1.setWorking(flowFile1, false);
- rec1.setDestination(queue);
-
- // Create a Record that we can swap out
- final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
- .id(2L)
- .addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
- .contentClaim(claim2)
- .build();
-
- final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
- rec2.setWorking(flowFile2, false);
- rec2.setDestination(queue);
-
- final List records = new ArrayList<>();
- records.add(rec1);
- records.add(rec2);
- repo.updateRepository(records);
-
- final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null);
- repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
- }
-
- final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
- try (final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties))) {
- repo.initialize(recoveryClaimManager);
- final long largestId = repo.loadFlowFiles(queueProvider);
-
- // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
- assertEquals(1, largestId);
- }
-
- // resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
- // because resource claim 2 is referenced only by flowfiles that are swapped out.
- assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
- assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
-
- final SwapSummary summary = queue.recoverSwappedFlowFiles();
- assertNotNull(summary);
- assertEquals(2, summary.getMaxFlowFileId().intValue());
- assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
-
- final List swappedOutClaims = summary.getResourceClaims();
- assertNotNull(swappedOutClaims);
- assertEquals(1, swappedOutClaims.size());
- assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
- }
-
- @Test
- public void testRestartWithOneRecord() throws IOException {
- final RocksDBFlowFileRepository repo = new RocksDBFlowFileRepository(NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath, additionalProperties));
- repo.initialize(new StandardResourceClaimManager());
-
- final TestRocksDBFlowFileRepository.TestQueueProvider queueProvider = new TestRocksDBFlowFileRepository.TestQueueProvider();
- repo.loadFlowFiles(queueProvider);
-
- final List flowFileCollection = new ArrayList<>();
-
- final Connection connection = Mockito.mock(Connection.class);
- when(connection.getIdentifier()).thenReturn("1234");
-
- final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
- when(queue.getIdentifier()).thenReturn("1234");
- doAnswer((Answer