mirror of https://github.com/apache/nifi.git
NIFI-9704: Updated the ContentRepositoryScanTask to show details of how much content in the content repo is retained by each queue in the dataflow. Changed default for nifi.content.claim.max.appendable.size property from 1 MB to 50 KB. Updated docs to reflect the new default value and explain what the property does and how it's used.
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #5780.
This commit is contained in:
parent
04d061a7bd
commit
d0a23bc26b
|
@ -3312,8 +3312,11 @@ FlowFile Repository, if also on that disk, could become corrupt. To avoid this s
|
|||
|====
|
||||
|*Property*|*Description*
|
||||
|`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository` and should only be changed with caution. To store flowfile content in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to `org.apache.nifi.controller.repository.VolatileContentRepository`.
|
||||
|`nifi.content.claim.max.appendable.size`|The maximum size for a content claim. The default value is `1 MB`.
|
||||
|`nifi.content.claim.max.flow.files`| The max amount of claims to keep open for writing. The default value is `100`
|
||||
|`nifi.content.claim.max.appendable.size`|When NiFi processes many small FlowFiles, the contents of those FlowFiles are stored in the content repository, but we do not store the content of each
|
||||
individual FlowFile as a separate file in the content repository. Doing so would be very detrimental to performance, if each 120 byte FlowFile, for instance, was written to its own file. Instead,
|
||||
we continue writing to the same file until it reaches some threshold. This property configures that threshold. Setting the value too small can result in poor performance due to reading from and
|
||||
writing to too many files. However, a file can only be deleted from the content repository once there are no longer any FlowFiles pointing to it. Therefore, setting the value too large can result
|
||||
in data remaining in the content repository for much longer, potentially leading to the content repository running out of disk space. The default value is `50 KB`.
|
||||
|`nifi.content.repository.directory.default`*|The location of the Content Repository. The default value is `./content_repository`. +
|
||||
+
|
||||
*NOTE*: Multiple content repositories can be specified by using the `nifi.content.repository.directory.` prefix with unique suffixes and separate paths as values. +
|
||||
|
|
|
@ -242,6 +242,13 @@ public interface ContentRepository {
|
|||
*/
|
||||
long size(ContentClaim claim) throws IOException;
|
||||
|
||||
/**
|
||||
* @param claim to get size of
|
||||
* @return size in bytes of the file/object backing the given resource claim, or 0 if this operation is not supported by the implementation
|
||||
* @throws IOException if size check failed
|
||||
*/
|
||||
long size(ResourceClaim claim) throws IOException;
|
||||
|
||||
/**
|
||||
* Provides access to the input stream for the given claim
|
||||
*
|
||||
|
|
|
@ -907,6 +907,16 @@ public class FileSystemRepository implements ContentRepository {
|
|||
return claim.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size(final ResourceClaim claim) throws IOException {
|
||||
final Path path = getPath(claim);
|
||||
if (path == null) {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
return Files.size(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream read(final ResourceClaim claim) throws IOException {
|
||||
if (claim == null) {
|
||||
|
|
|
@ -16,6 +16,20 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.repository;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
|
||||
import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
|
||||
import org.apache.nifi.controller.repository.io.MemoryManager;
|
||||
import org.apache.nifi.engine.FlowEngine;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FilterOutputStream;
|
||||
|
@ -39,19 +53,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
|
||||
import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
|
||||
import org.apache.nifi.controller.repository.io.MemoryManager;
|
||||
import org.apache.nifi.engine.FlowEngine;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -458,6 +459,11 @@ public class VolatileContentRepository implements ContentRepository {
|
|||
return backupClaim == null ? getContent(claim).getSize() : getBackupRepository().size(claim);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size(final ResourceClaim claim) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream read(final ContentClaim claim) throws IOException {
|
||||
if (claim == null) {
|
||||
|
|
|
@ -16,7 +16,10 @@
|
|||
*/
|
||||
package org.apache.nifi.diagnostics.bootstrap.tasks;
|
||||
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.flow.FlowManager;
|
||||
import org.apache.nifi.controller.queue.QueueSize;
|
||||
import org.apache.nifi.controller.repository.ContentRepository;
|
||||
import org.apache.nifi.controller.repository.FlowFileRepository;
|
||||
import org.apache.nifi.controller.repository.FlowFileSwapManager;
|
||||
|
@ -26,11 +29,16 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
|||
import org.apache.nifi.diagnostics.DiagnosticTask;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
|
||||
import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -63,6 +71,10 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
|
|||
|
||||
final List<String> details = new ArrayList<>();
|
||||
|
||||
final Map<String, RetainedFileSet> retainedFileSetsByQueue = new HashMap<>();
|
||||
final FlowManager flowManager = flowController.getFlowManager();
|
||||
|
||||
final NumberFormat numberFormat = NumberFormat.getNumberInstance();
|
||||
for (final String containerName : contentRepository.getContainerNames()) {
|
||||
try {
|
||||
final Set<ResourceClaim> resourceClaims = contentRepository.getActiveResourceClaims(containerName);
|
||||
|
@ -77,8 +89,22 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
|
|||
final Set<ResourceClaimReference> references = referenceMap == null ? Collections.emptySet() : referenceMap.getOrDefault(resourceClaim, Collections.emptySet());
|
||||
|
||||
final String path = resourceClaim.getContainer() + "/" + resourceClaim.getSection() + "/" + resourceClaim.getId();
|
||||
details.add(String.format("%1$s, Claimant Count = %2$d, In Use = %3$b, Awaiting Destruction = %4$b, References (%5$d) = %6$s",
|
||||
path, claimCount, inUse, destructable, references.size(), references.toString()));
|
||||
final long fileSize = contentRepository.size(resourceClaim);
|
||||
details.add(String.format("%1$s; Size = %2$s bytes; Claimant Count = %3$d; In Use = %4$b; Awaiting Destruction = %5$b; References (%6$d) = %7$s",
|
||||
path, numberFormat.format(fileSize), claimCount, inUse, destructable, references.size(), references));
|
||||
|
||||
for (final ResourceClaimReference claimReference : references) {
|
||||
final String queueId = claimReference.getQueueIdentifier();
|
||||
final Connection connection = flowManager.getConnection(queueId);
|
||||
QueueSize queueSize = new QueueSize(0, 0L);
|
||||
if (connection != null) {
|
||||
queueSize = connection.getFlowFileQueue().size();
|
||||
}
|
||||
|
||||
final RetainedFileSet retainedFileSet = retainedFileSetsByQueue.computeIfAbsent(queueId, RetainedFileSet::new);
|
||||
retainedFileSet.addFile(path, fileSize);
|
||||
retainedFileSet.setQueueSize(queueSize);
|
||||
}
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to obtain listing of Active Resource Claims for container {}", containerName, e);
|
||||
|
@ -100,6 +126,60 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
|
|||
}
|
||||
}
|
||||
|
||||
details.add("");
|
||||
|
||||
final List<RetainedFileSet> retainedFileSets = new ArrayList<>(retainedFileSetsByQueue.values());
|
||||
retainedFileSets.sort(Comparator.comparing(RetainedFileSet::getByteCount).reversed());
|
||||
details.add("The following queues retain data in the Content Repository:");
|
||||
if (retainedFileSets.isEmpty()) {
|
||||
details.add("No queues retain any files in the Content Repository");
|
||||
} else {
|
||||
for (final RetainedFileSet retainedFileSet : retainedFileSets) {
|
||||
final String formatted = String.format("Queue ID = %s; Queue Size = %s FlowFiles / %s; Retained Files = %d; Retained Size = %s bytes (%s)",
|
||||
retainedFileSet.getQueueId(), numberFormat.format(retainedFileSet.getQueueSize().getObjectCount()), FormatUtils.formatDataSize(retainedFileSet.getQueueSize().getByteCount()),
|
||||
retainedFileSet.getFilenames().size(), numberFormat.format(retainedFileSet.getByteCount()), FormatUtils.formatDataSize(retainedFileSet.getByteCount()));
|
||||
|
||||
details.add(formatted);
|
||||
}
|
||||
}
|
||||
|
||||
return new StandardDiagnosticsDumpElement("Content Repository Scan", details);
|
||||
}
|
||||
|
||||
private static class RetainedFileSet {
|
||||
private final String queueId;
|
||||
private final Set<String> filenames = new HashSet<>();
|
||||
private long byteCount;
|
||||
private QueueSize queueSize;
|
||||
|
||||
public RetainedFileSet(final String queueId) {
|
||||
this.queueId = queueId;
|
||||
}
|
||||
|
||||
public String getQueueId() {
|
||||
return queueId;
|
||||
}
|
||||
|
||||
public void addFile(final String filename, final long bytes) {
|
||||
if (filenames.add(filename)) {
|
||||
byteCount += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getFilenames() {
|
||||
return filenames;
|
||||
}
|
||||
|
||||
public long getByteCount() {
|
||||
return byteCount;
|
||||
}
|
||||
|
||||
public QueueSize getQueueSize() {
|
||||
return queueSize;
|
||||
}
|
||||
|
||||
public void setQueueSize(final QueueSize queueSize) {
|
||||
this.queueSize = queueSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3430,6 +3430,11 @@ public class StandardProcessSessionIT {
|
|||
return Files.size(getPath(claim));
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size(final ResourceClaim claim) throws IOException {
|
||||
return Files.size(getPath(claim));
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream read(ContentClaim claim) throws IOException {
|
||||
if (disableRead) {
|
||||
|
|
|
@ -64,7 +64,7 @@
|
|||
<nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
|
||||
|
||||
<nifi.content.repository.implementation>org.apache.nifi.controller.repository.FileSystemRepository</nifi.content.repository.implementation>
|
||||
<nifi.content.claim.max.appendable.size>1 MB</nifi.content.claim.max.appendable.size>
|
||||
<nifi.content.claim.max.appendable.size>50 KB</nifi.content.claim.max.appendable.size>
|
||||
<nifi.content.repository.directory.default>./content_repository</nifi.content.repository.directory.default>
|
||||
<nifi.content.repository.archive.max.retention.period>7 days</nifi.content.repository.archive.max.retention.period>
|
||||
<nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
|
||||
|
|
|
@ -208,6 +208,11 @@ public class ByteArrayContentRepository implements ContentRepository {
|
|||
return claim.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size(final ResourceClaim claim) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream read(final ContentClaim claim) {
|
||||
final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);
|
||||
|
|
|
@ -244,6 +244,11 @@ public class StatelessFileSystemContentRepository implements ContentRepository {
|
|||
return claim.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size(final ResourceClaim claim) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream read(final ContentClaim claim) throws IOException {
|
||||
if (claim == null) {
|
||||
|
|
Loading…
Reference in New Issue