diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index bad2e5569b..8128c6529e 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -352,7 +352,7 @@ public class NiFiProperties extends ApplicationProperties { public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib"; public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions"; public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs"; - public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB"; + public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "50 KB"; public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L; public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB"; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index e0dc7c8885..c14c879397 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -169,6 +169,7 @@ import org.apache.nifi.nar.PythonBundle; import org.apache.nifi.parameter.ParameterContextManager; import org.apache.nifi.parameter.ParameterProvider; import org.apache.nifi.parameter.StandardParameterContextManager; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.StandardProcessContext; @@ -542,7 +543,9 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager); parameterContextManager = new StandardParameterContextManager(); - repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider); + final long maxAppendableBytes = getMaxAppendableBytes(); + repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, + counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableBytes); assetManager = createAssetManager(nifiProperties); this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true); @@ -1044,8 +1047,9 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1); // Begin expiring FlowFiles that are old + final long maxAppendableClaimBytes = getMaxAppendableBytes(); final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, - flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider); + flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableClaimBytes); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the @@ -1096,6 +1100,12 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr } } + private long getMaxAppendableBytes() { + final String maxAppendableClaimSize = nifiProperties.getMaxAppendableClaimSize(); + final long maxAppendableClaimBytes = DataUnit.parseDataSize(maxAppendableClaimSize, DataUnit.B).longValue(); + return maxAppendableClaimBytes; + } + private void notifyComponentsConfigurationRestored() { for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) { final Processor processor = procNode.getProcessor(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java index 2ae985999a..ddb1b4aa8b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java @@ -27,14 +27,17 @@ import java.util.concurrent.atomic.AtomicLong; public class StandardRepositoryContext extends AbstractRepositoryContext implements RepositoryContext { + private final long maxAppendableClaimBytes; + public StandardRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository, - final StateManager stateManager) { + final StateManager stateManager, final long maxAppendableClaimBytes) { super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository, stateManager); + this.maxAppendableClaimBytes = maxAppendableClaimBytes; } @Override public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) { - return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker); + return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker, maxAppendableClaimBytes, 8192); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java index 8d78953297..0e1e913c84 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java @@ -17,8 +17,8 @@ package org.apache.nifi.controller.repository.claim; -import org.apache.nifi.controller.repository.io.ContentClaimOutputStream; import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.io.ContentClaimOutputStream; import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream; @@ -35,15 +35,13 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache { private final Map streamMap = new ConcurrentHashMap<>(); private final Queue queue = new LinkedList<>(); private final PerformanceTracker performanceTracker; + private final long maxAppendableClaimBytes; private final int bufferSize; - public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker) { - this(contentRepo, performanceTracker, 8192); - } - - public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final int bufferSize) { + public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final long maxAppendableClaimBytes, final int bufferSize) { this.contentRepo = contentRepo; this.performanceTracker = performanceTracker; + this.maxAppendableClaimBytes = maxAppendableClaimBytes; this.bufferSize = bufferSize; } @@ -154,7 +152,10 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache { scc.setLength(0L); } - queue.offer(claim); + // Add the claim back to the queue if it is still writable + if ((scc.getOffset() + scc.getLength()) < maxAppendableClaimBytes) { + queue.offer(claim); + } } }; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java index 45540f997d..e2594fc905 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java @@ -37,10 +37,12 @@ public class RepositoryContextFactory { private final CounterRepository counterRepo; private final ProvenanceRepository provenanceRepo; private final StateManagerProvider stateManagerProvider; + private final long maxAppendableClaimBytes; public RepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, - final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider) { + final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider, + final long maxAppendableClaimBytes) { this.contentRepo = contentRepository; this.flowFileRepo = flowFileRepository; @@ -48,11 +50,12 @@ public class RepositoryContextFactory { this.counterRepo = counterRepository; this.provenanceRepo = provenanceRepository; this.stateManagerProvider = stateManagerProvider; + this.maxAppendableClaimBytes = maxAppendableClaimBytes; } public RepositoryContext newProcessContext(final Connectable connectable, final AtomicLong connectionIndex) { final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier()); - return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager); + return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager, maxAppendableClaimBytes); } public ContentRepository getContentRepository() { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index f384d6d214..d24da53d7b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -225,7 +225,8 @@ public class StandardProcessSessionIT { stateManager = new MockStateManager(connectable); stateManager.setIgnoreAnnotations(true); - context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager); + context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, + counterRepository, provenanceRepo, stateManager, 50_000L); session = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); } @@ -3134,7 +3135,8 @@ public class StandardProcessSessionIT { flowFileEventRepository, counterRepository, provenanceRepo, - stateManager); + stateManager, + 50_000L); return new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardContentClaimWriteCache.java similarity index 64% rename from nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java rename to nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardContentClaimWriteCache.java index f5863abb15..343b325455 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardContentClaimWriteCache.java @@ -33,21 +33,22 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Random; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -public class TestContentClaimWriteCache { +public class TestStandardContentClaimWriteCache { private FileSystemRepository repository = null; private StandardResourceClaimManager claimManager = null; private final File rootFile = new File("target/testContentClaimWriteCache"); - private NiFiProperties nifiProperties; @BeforeEach public void setup() throws IOException { - nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile()); + NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile()); if (rootFile.exists()) { DiskUtils.deleteRecursively(rootFile); } @@ -64,7 +65,7 @@ public class TestContentClaimWriteCache { @Test public void testFlushWriteCorrectData() throws IOException { - final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 4); + final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 50_000L, 4); final ContentClaim claim1 = cache.getContentClaim(); assertNotNull(claim1); @@ -97,4 +98,48 @@ public class TestContentClaimWriteCache { assertArrayEquals("good-dayhello".getBytes(), buff2); } + @Test + public void testWriteLargeRollsOverToNewFileOnNext() throws IOException { + final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 50_000L, 4); + + final ContentClaim claim1 = cache.getContentClaim(); + assertNotNull(claim1); + + try (final OutputStream out = cache.write(claim1)) { + assertNotNull(out); + out.write("hello".getBytes()); + out.write("good-bye".getBytes()); + + cache.flush(); + } + + final ContentClaim claim2 = cache.getContentClaim(); + assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim()); + + try (final OutputStream out = cache.write(claim2)) { + assertNotNull(out); + out.write("greeting".getBytes()); + } + + final ContentClaim claim3 = cache.getContentClaim(); + assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim()); + + // Write 1 MB to the claim. This should result in the next Content Claim having a different Resource Claim. + try (final OutputStream out = cache.write(claim3)) { + assertNotNull(out); + final byte[] buffer = new byte[1024 * 1024]; + final Random random = new Random(); + random.nextBytes(buffer); + out.write(buffer); + } + + assertEquals(3, claimManager.getClaimantCount(claim1.getResourceClaim())); + + final ContentClaim claim4 = cache.getContentClaim(); + assertNotNull(claim4); + assertNotEquals(claim1.getResourceClaim(), claim4.getResourceClaim()); + + assertEquals(1, claimManager.getClaimantCount(claim4.getResourceClaim())); + } + } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java index 6762d1de5d..9fbba530e5 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java @@ -86,7 +86,6 @@ import org.apache.nifi.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; import java.io.File; import java.io.IOException; import java.time.Duration; @@ -97,6 +96,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; public class StandardStatelessDataflowFactory implements StatelessDataflowFactory { private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);