diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 4c9320512c..ad99955c4b 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -220,6 +220,7 @@ public abstract class NiFiProperties { public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256"; public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min"; public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100; + public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB"; public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap"; public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec"; @@ -924,6 +925,15 @@ public abstract class NiFiProperties { return provenanceRepositoryPaths; } + /** + * Returns the number of claims to keep open for writing. Ideally, this will be at + * least as large as the number of threads that will be updating the repository simultaneously but we don't want + * to get too large because it will hold open up to this many FileOutputStreams. + * + * Default is {@link #DEFAULT_MAX_FLOWFILES_PER_CLAIM} + * + * @return the maximum number of flow files per claim + */ public int getMaxFlowFilesPerClaim() { try { return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); @@ -932,8 +942,16 @@ public abstract class NiFiProperties { } } + /** + * Returns the maximum size, in bytes, that claims should grow before writing a new file. This means that we won't continually write to one + * file that keeps growing but gives us a chance to bunch together many small files. + * + * Default is {@link #DEFAULT_MAX_APPENDABLE_CLAIM_SIZE} + * + * @return the maximum appendable claim size + */ public String getMaxAppendableClaimSize() { - return getProperty(MAX_APPENDABLE_CLAIM_SIZE); + return getProperty(MAX_APPENDABLE_CLAIM_SIZE, DEFAULT_MAX_APPENDABLE_CLAIM_SIZE); } public String getProperty(final String key, final String defaultValue) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index eaa2cbc204..0bfe6a18fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -66,6 +66,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; @@ -85,6 +86,9 @@ public class FileSystemRepository implements ContentRepository { public static final int SECTIONS_PER_CONTAINER = 1024; public static final long MIN_CLEANUP_INTERVAL_MILLIS = 1000; public static final String ARCHIVE_DIR_NAME = "archive"; + // 100 MB cap for the configurable NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE property to prevent + // unnecessarily large resource claim files + public static final String APPENDABLE_CLAIM_LENGTH_CAP = "100 MB"; public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%"); private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class); @@ -97,22 +101,23 @@ public class FileSystemRepository implements ContentRepository { private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); private final ConcurrentMap> reclaimable = new ConcurrentHashMap<>(); private final Map containerStateMap = new HashMap<>(); - // 1 MB. This could be adjusted but 1 MB seems reasonable, as it means that we won't continually write to one - // file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues - // with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes - // in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of - // files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now. - static final int MAX_APPENDABLE_CLAIM_LENGTH = 1024 * 1024; - // Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at + // Queue for claims that are kept open for writing. Ideally, this will be at // least as large as the number of threads that will be updating the repository simultaneously but we don't want // to get too large because it will hold open up to this many FileOutputStreams. // The queue is used to determine which claim to write to and then the corresponding Map can be used to obtain // the OutputStream that we can use for writing to the claim. - private final BlockingQueue writableClaimQueue = new LinkedBlockingQueue<>(100); + private final BlockingQueue writableClaimQueue; private final ConcurrentMap writableClaimStreams = new ConcurrentHashMap<>(100); private final boolean archiveData; + // 1 MB default, as it means that we won't continually write to one + // file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues + // with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes + // in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of + // files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now. + private final long maxAppendableClaimLength; + private final int maxFlowFilesPerClaim; private final long maxArchiveMillis; private final Map minUsableContainerBytesForArchive = new HashMap<>(); private final boolean alwaysSync; @@ -140,6 +145,9 @@ public class FileSystemRepository implements ContentRepository { alwaysSync = false; containerCleanupExecutor = null; nifiProperties = null; + maxAppendableClaimLength = 0; + maxFlowFilesPerClaim = 0; + writableClaimQueue = null; } public FileSystemRepository(final NiFiProperties nifiProperties) throws IOException { @@ -149,6 +157,20 @@ public class FileSystemRepository implements ContentRepository { for (final Path path : fileRespositoryPaths.values()) { Files.createDirectories(path); } + this.maxFlowFilesPerClaim = nifiProperties.getMaxFlowFilesPerClaim(); + this.writableClaimQueue = new LinkedBlockingQueue<>(maxFlowFilesPerClaim); + final long configuredAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + final long appendableClaimLengthCap = DataUnit.parseDataSize(APPENDABLE_CLAIM_LENGTH_CAP, DataUnit.B).longValue(); + if (configuredAppendableClaimLength > appendableClaimLengthCap) { + LOG.warn("Configured property '{}' with value {} exceeds cap of {}. Setting value to {}", + NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE, + configuredAppendableClaimLength, + APPENDABLE_CLAIM_LENGTH_CAP, + APPENDABLE_CLAIM_LENGTH_CAP); + this.maxAppendableClaimLength = appendableClaimLengthCap; + } else { + this.maxAppendableClaimLength = configuredAppendableClaimLength; + } this.containers = new HashMap<>(fileRespositoryPaths); this.containerNames = new ArrayList<>(containers.keySet()); @@ -948,7 +970,7 @@ public class FileSystemRepository implements ContentRepository { // is called. In this case, we don't have to actually close the file stream. Instead, we // can just add it onto the queue and continue to use it for the next content claim. final long resourceClaimLength = scc.getOffset() + scc.getLength(); - if (recycle && resourceClaimLength < MAX_APPENDABLE_CLAIM_LENGTH) { + if (recycle && resourceClaimLength < maxAppendableClaimLength) { final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength); // We are checking that writableClaimStreams contains the resource claim as a key, as a sanity check. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 87ab29b30d..7cc6e9b20d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -50,6 +50,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.util.DiskUtils; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.junit.After; @@ -286,10 +287,12 @@ public class TestFileSystemRepository { repository.incrementClaimaintCount(claim); final Path claimPath = getPath(claim); + final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize(); + final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue(); // Create the file. try (final OutputStream out = repository.write(claim)) { - out.write(new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]); + out.write(new byte[maxClaimLength]); } int count = repository.decrementClaimantCount(claim); @@ -495,7 +498,9 @@ public class TestFileSystemRepository { // write at least 1 MB to the output stream so that when we close the output stream // the repo won't keep the stream open. - final byte[] buff = new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]; + final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize(); + final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue(); + final byte[] buff = new byte[maxClaimLength]; out.write(buff); out.write(buff); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties index 109ddbc43b..f4718539d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties @@ -45,7 +45,7 @@ nifi.swap.out.period=5 sec nifi.swap.out.threads=4 # Content Repository -nifi.content.claim.max.appendable.size=10 MB +nifi.content.claim.max.appendable.size=1 MB nifi.content.claim.max.flow.files=100 nifi.content.repository.directory.default=./target/content_repository