NIFI-13796: When writing to a content claim and its size + offset exceeds the max appendable size, do not write to that content claim again

This closes #9306.

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2024-09-23 17:36:43 -04:00 committed by Joseph Witt
parent 99c31b8da0
commit 457d83ef84
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
8 changed files with 85 additions and 21 deletions

View File

@ -352,7 +352,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib"; public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib";
public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions"; public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions";
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs"; public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs";
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB"; public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "50 KB";
public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L; public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB"; public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";

View File

@ -169,6 +169,7 @@ import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.parameter.ParameterContextManager; import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterProvider; import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.parameter.StandardParameterContextManager; import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.StandardProcessContext;
@ -542,7 +543,9 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager); processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager);
parameterContextManager = new StandardParameterContextManager(); parameterContextManager = new StandardParameterContextManager();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider); final long maxAppendableBytes = getMaxAppendableBytes();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository,
counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableBytes);
assetManager = createAssetManager(nifiProperties); assetManager = createAssetManager(nifiProperties);
this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true); this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true);
@ -1044,8 +1047,9 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1); flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1);
// Begin expiring FlowFiles that are old // Begin expiring FlowFiles that are old
final long maxAppendableClaimBytes = getMaxAppendableBytes();
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider); flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableClaimBytes);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
// now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the
@ -1096,6 +1100,12 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
} }
} }
private long getMaxAppendableBytes() {
final String maxAppendableClaimSize = nifiProperties.getMaxAppendableClaimSize();
final long maxAppendableClaimBytes = DataUnit.parseDataSize(maxAppendableClaimSize, DataUnit.B).longValue();
return maxAppendableClaimBytes;
}
private void notifyComponentsConfigurationRestored() { private void notifyComponentsConfigurationRestored() {
for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) { for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
final Processor processor = procNode.getProcessor(); final Processor processor = procNode.getProcessor();

View File

@ -27,14 +27,17 @@ import java.util.concurrent.atomic.AtomicLong;
public class StandardRepositoryContext extends AbstractRepositoryContext implements RepositoryContext { public class StandardRepositoryContext extends AbstractRepositoryContext implements RepositoryContext {
private final long maxAppendableClaimBytes;
public StandardRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, public StandardRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository, final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository,
final StateManager stateManager) { final StateManager stateManager, final long maxAppendableClaimBytes) {
super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository, stateManager); super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository, stateManager);
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
} }
@Override @Override
public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) { public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) {
return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker); return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker, maxAppendableClaimBytes, 8192);
} }
} }

View File

@ -17,8 +17,8 @@
package org.apache.nifi.controller.repository.claim; package org.apache.nifi.controller.repository.claim;
import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream; import org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;
@ -35,15 +35,13 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
private final Map<ResourceClaim, MappedOutputStream> streamMap = new ConcurrentHashMap<>(); private final Map<ResourceClaim, MappedOutputStream> streamMap = new ConcurrentHashMap<>();
private final Queue<ContentClaim> queue = new LinkedList<>(); private final Queue<ContentClaim> queue = new LinkedList<>();
private final PerformanceTracker performanceTracker; private final PerformanceTracker performanceTracker;
private final long maxAppendableClaimBytes;
private final int bufferSize; private final int bufferSize;
public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker) { public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final long maxAppendableClaimBytes, final int bufferSize) {
this(contentRepo, performanceTracker, 8192);
}
public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final int bufferSize) {
this.contentRepo = contentRepo; this.contentRepo = contentRepo;
this.performanceTracker = performanceTracker; this.performanceTracker = performanceTracker;
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
@ -154,7 +152,10 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
scc.setLength(0L); scc.setLength(0L);
} }
queue.offer(claim); // Add the claim back to the queue if it is still writable
if ((scc.getOffset() + scc.getLength()) < maxAppendableClaimBytes) {
queue.offer(claim);
}
} }
}; };
} }

View File

@ -37,10 +37,12 @@ public class RepositoryContextFactory {
private final CounterRepository counterRepo; private final CounterRepository counterRepo;
private final ProvenanceRepository provenanceRepo; private final ProvenanceRepository provenanceRepo;
private final StateManagerProvider stateManagerProvider; private final StateManagerProvider stateManagerProvider;
private final long maxAppendableClaimBytes;
public RepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, public RepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository,
final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider) { final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider,
final long maxAppendableClaimBytes) {
this.contentRepo = contentRepository; this.contentRepo = contentRepository;
this.flowFileRepo = flowFileRepository; this.flowFileRepo = flowFileRepository;
@ -48,11 +50,12 @@ public class RepositoryContextFactory {
this.counterRepo = counterRepository; this.counterRepo = counterRepository;
this.provenanceRepo = provenanceRepository; this.provenanceRepo = provenanceRepository;
this.stateManagerProvider = stateManagerProvider; this.stateManagerProvider = stateManagerProvider;
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
} }
public RepositoryContext newProcessContext(final Connectable connectable, final AtomicLong connectionIndex) { public RepositoryContext newProcessContext(final Connectable connectable, final AtomicLong connectionIndex) {
final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier()); final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier());
return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager); return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager, maxAppendableClaimBytes);
} }
public ContentRepository getContentRepository() { public ContentRepository getContentRepository() {

View File

@ -225,7 +225,8 @@ public class StandardProcessSessionIT {
stateManager = new MockStateManager(connectable); stateManager = new MockStateManager(connectable);
stateManager.setIgnoreAnnotations(true); stateManager.setIgnoreAnnotations(true);
context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager); context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository,
counterRepository, provenanceRepo, stateManager, 50_000L);
session = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); session = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
} }
@ -3134,7 +3135,8 @@ public class StandardProcessSessionIT {
flowFileEventRepository, flowFileEventRepository,
counterRepository, counterRepository,
provenanceRepo, provenanceRepo,
stateManager); stateManager,
50_000L);
return new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); return new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
} }

View File

@ -33,21 +33,22 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
public class TestContentClaimWriteCache { public class TestStandardContentClaimWriteCache {
private FileSystemRepository repository = null; private FileSystemRepository repository = null;
private StandardResourceClaimManager claimManager = null; private StandardResourceClaimManager claimManager = null;
private final File rootFile = new File("target/testContentClaimWriteCache"); private final File rootFile = new File("target/testContentClaimWriteCache");
private NiFiProperties nifiProperties;
@BeforeEach @BeforeEach
public void setup() throws IOException { public void setup() throws IOException {
nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile()); NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile());
if (rootFile.exists()) { if (rootFile.exists()) {
DiskUtils.deleteRecursively(rootFile); DiskUtils.deleteRecursively(rootFile);
} }
@ -64,7 +65,7 @@ public class TestContentClaimWriteCache {
@Test @Test
public void testFlushWriteCorrectData() throws IOException { public void testFlushWriteCorrectData() throws IOException {
final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 4); final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 50_000L, 4);
final ContentClaim claim1 = cache.getContentClaim(); final ContentClaim claim1 = cache.getContentClaim();
assertNotNull(claim1); assertNotNull(claim1);
@ -97,4 +98,48 @@ public class TestContentClaimWriteCache {
assertArrayEquals("good-dayhello".getBytes(), buff2); assertArrayEquals("good-dayhello".getBytes(), buff2);
} }
@Test
public void testWriteLargeRollsOverToNewFileOnNext() throws IOException {
final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 50_000L, 4);
final ContentClaim claim1 = cache.getContentClaim();
assertNotNull(claim1);
try (final OutputStream out = cache.write(claim1)) {
assertNotNull(out);
out.write("hello".getBytes());
out.write("good-bye".getBytes());
cache.flush();
}
final ContentClaim claim2 = cache.getContentClaim();
assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim());
try (final OutputStream out = cache.write(claim2)) {
assertNotNull(out);
out.write("greeting".getBytes());
}
final ContentClaim claim3 = cache.getContentClaim();
assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
// Write 1 MB to the claim. This should result in the next Content Claim having a different Resource Claim.
try (final OutputStream out = cache.write(claim3)) {
assertNotNull(out);
final byte[] buffer = new byte[1024 * 1024];
final Random random = new Random();
random.nextBytes(buffer);
out.write(buffer);
}
assertEquals(3, claimManager.getClaimantCount(claim1.getResourceClaim()));
final ContentClaim claim4 = cache.getContentClaim();
assertNotNull(claim4);
assertNotEquals(claim1.getResourceClaim(), claim4.getResourceClaim());
assertEquals(1, claimManager.getClaimantCount(claim4.getResourceClaim()));
}
} }

View File

@ -86,7 +86,6 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
@ -97,6 +96,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
public class StandardStatelessDataflowFactory implements StatelessDataflowFactory { public class StandardStatelessDataflowFactory implements StatelessDataflowFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class); private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);