diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 6501c2777d..d0db77bbaf 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -3312,8 +3312,11 @@ FlowFile Repository, if also on that disk, could become corrupt. To avoid this s |==== |*Property*|*Description* |`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository` and should only be changed with caution. To store flowfile content in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to `org.apache.nifi.controller.repository.VolatileContentRepository`. -|`nifi.content.claim.max.appendable.size`|The maximum size for a content claim. The default value is `1 MB`. -|`nifi.content.claim.max.flow.files`| The max amount of claims to keep open for writing. The default value is `100` +|`nifi.content.claim.max.appendable.size`|When NiFi processes many small FlowFiles, the contents of those FlowFiles are stored in the content repository, but we do not store the content of each +individual FlowFile as a separate file in the content repository. Doing so would be very detrimental to performance, if each 120 byte FlowFile, for instance, was written to its own file. Instead, +we continue writing to the same file until it reaches some threshold. This property configures that threshold. Setting the value too small can result in poor performance due to reading from and +writing to too many files. However, a file can only be deleted from the content repository once there are no longer any FlowFiles pointing to it. Therefore, setting the value too large can result +in data remaining in the content repository for much longer, potentially leading to the content repository running out of disk space. The default value is `50 KB`. |`nifi.content.repository.directory.default`*|The location of the Content Repository. The default value is `./content_repository`. + + *NOTE*: Multiple content repositories can be specified by using the `nifi.content.repository.directory.` prefix with unique suffixes and separate paths as values. + diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java index 66931435af..f838db7181 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -242,6 +242,13 @@ public interface ContentRepository { */ long size(ContentClaim claim) throws IOException; + /** + * @param claim to get size of + * @return size in bytes of the file/object backing the given resource claim, or 0 if this operation is not supported by the implementation + * @throws IOException if size check failed + */ + long size(ResourceClaim claim) throws IOException; + /** * Provides access to the input stream for the given claim * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 500bab7a28..689487f164 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -907,6 +907,16 @@ public class FileSystemRepository implements ContentRepository { return claim.getLength(); } + @Override + public long size(final ResourceClaim claim) throws IOException { + final Path path = getPath(claim); + if (path == null) { + return 0L; + } + + return Files.size(path); + } + @Override public InputStream read(final ResourceClaim claim) throws IOException { if (claim == null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index eb2a2db4b0..03f8ce0a2a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -16,6 +16,20 @@ */ package org.apache.nifi.controller.repository; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream; +import org.apache.nifi.controller.repository.io.MemoryManager; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayInputStream; import java.io.FileInputStream; import java.io.FilterOutputStream; @@ -39,19 +53,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream; -import org.apache.nifi.controller.repository.io.MemoryManager; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** *

@@ -458,6 +459,11 @@ public class VolatileContentRepository implements ContentRepository { return backupClaim == null ? getContent(claim).getSize() : getBackupRepository().size(claim); } + @Override + public long size(final ResourceClaim claim) throws IOException { + return 0; + } + @Override public InputStream read(final ContentClaim claim) throws IOException { if (claim == null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java index ca175df0d1..de52b0ddb2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java @@ -16,7 +16,10 @@ */ package org.apache.nifi.diagnostics.bootstrap.tasks; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -26,11 +29,16 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.diagnostics.DiagnosticTask; import org.apache.nifi.diagnostics.DiagnosticsDumpElement; import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement; +import org.apache.nifi.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.NumberFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -63,6 +71,10 @@ public class ContentRepositoryScanTask implements DiagnosticTask { final List details = new ArrayList<>(); + final Map retainedFileSetsByQueue = new HashMap<>(); + final FlowManager flowManager = flowController.getFlowManager(); + + final NumberFormat numberFormat = NumberFormat.getNumberInstance(); for (final String containerName : contentRepository.getContainerNames()) { try { final Set resourceClaims = contentRepository.getActiveResourceClaims(containerName); @@ -77,8 +89,22 @@ public class ContentRepositoryScanTask implements DiagnosticTask { final Set references = referenceMap == null ? Collections.emptySet() : referenceMap.getOrDefault(resourceClaim, Collections.emptySet()); final String path = resourceClaim.getContainer() + "/" + resourceClaim.getSection() + "/" + resourceClaim.getId(); - details.add(String.format("%1$s, Claimant Count = %2$d, In Use = %3$b, Awaiting Destruction = %4$b, References (%5$d) = %6$s", - path, claimCount, inUse, destructable, references.size(), references.toString())); + final long fileSize = contentRepository.size(resourceClaim); + details.add(String.format("%1$s; Size = %2$s bytes; Claimant Count = %3$d; In Use = %4$b; Awaiting Destruction = %5$b; References (%6$d) = %7$s", + path, numberFormat.format(fileSize), claimCount, inUse, destructable, references.size(), references)); + + for (final ResourceClaimReference claimReference : references) { + final String queueId = claimReference.getQueueIdentifier(); + final Connection connection = flowManager.getConnection(queueId); + QueueSize queueSize = new QueueSize(0, 0L); + if (connection != null) { + queueSize = connection.getFlowFileQueue().size(); + } + + final RetainedFileSet retainedFileSet = retainedFileSetsByQueue.computeIfAbsent(queueId, RetainedFileSet::new); + retainedFileSet.addFile(path, fileSize); + retainedFileSet.setQueueSize(queueSize); + } } } catch (final Exception e) { logger.error("Failed to obtain listing of Active Resource Claims for container {}", containerName, e); @@ -100,6 +126,60 @@ public class ContentRepositoryScanTask implements DiagnosticTask { } } + details.add(""); + + final List retainedFileSets = new ArrayList<>(retainedFileSetsByQueue.values()); + retainedFileSets.sort(Comparator.comparing(RetainedFileSet::getByteCount).reversed()); + details.add("The following queues retain data in the Content Repository:"); + if (retainedFileSets.isEmpty()) { + details.add("No queues retain any files in the Content Repository"); + } else { + for (final RetainedFileSet retainedFileSet : retainedFileSets) { + final String formatted = String.format("Queue ID = %s; Queue Size = %s FlowFiles / %s; Retained Files = %d; Retained Size = %s bytes (%s)", + retainedFileSet.getQueueId(), numberFormat.format(retainedFileSet.getQueueSize().getObjectCount()), FormatUtils.formatDataSize(retainedFileSet.getQueueSize().getByteCount()), + retainedFileSet.getFilenames().size(), numberFormat.format(retainedFileSet.getByteCount()), FormatUtils.formatDataSize(retainedFileSet.getByteCount())); + + details.add(formatted); + } + } + return new StandardDiagnosticsDumpElement("Content Repository Scan", details); } + + private static class RetainedFileSet { + private final String queueId; + private final Set filenames = new HashSet<>(); + private long byteCount; + private QueueSize queueSize; + + public RetainedFileSet(final String queueId) { + this.queueId = queueId; + } + + public String getQueueId() { + return queueId; + } + + public void addFile(final String filename, final long bytes) { + if (filenames.add(filename)) { + byteCount += bytes; + } + } + + public Set getFilenames() { + return filenames; + } + + public long getByteCount() { + return byteCount; + } + + public QueueSize getQueueSize() { + return queueSize; + } + + public void setQueueSize(final QueueSize queueSize) { + this.queueSize = queueSize; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index d73b081f63..65b6ac9cf4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -3430,6 +3430,11 @@ public class StandardProcessSessionIT { return Files.size(getPath(claim)); } + @Override + public long size(final ResourceClaim claim) throws IOException { + return Files.size(getPath(claim)); + } + @Override public InputStream read(ContentClaim claim) throws IOException { if (disableRead) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index df08858bc2..f2054fa277 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -64,7 +64,7 @@ 20000 org.apache.nifi.controller.repository.FileSystemRepository - 1 MB + 50 KB ./content_repository 7 days 50% diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java index 6415893f2c..39baaea2ed 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java @@ -208,6 +208,11 @@ public class ByteArrayContentRepository implements ContentRepository { return claim.getLength(); } + @Override + public long size(final ResourceClaim claim) throws IOException { + return 0; + } + @Override public InputStream read(final ContentClaim claim) { final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java index e4367487d7..a0dfdd2063 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java @@ -244,6 +244,11 @@ public class StatelessFileSystemContentRepository implements ContentRepository { return claim.getLength(); } + @Override + public long size(final ResourceClaim claim) throws IOException { + return 0; + } + @Override public InputStream read(final ContentClaim claim) throws IOException { if (claim == null) {