NIFI-10817: Moved the calls in StandardProcessSession to 'resetState()' and 'acknowledgeRecords()' from the outer commit(boolean) to the inner commit(Checkpoint, boolean). By moving the call here, the logic of StandardProcessSession is unaffected. But the StatelessProcessSession that inherits from it now has the benefit of having the state cleaned up when calling super.commit(Checkpoint, boolean).

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6663
This commit is contained in:
Mark Payne 2022-11-14 16:09:25 -05:00 committed by Matthew Burgess
parent a71556f115
commit bda624823b
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
7 changed files with 39 additions and 10 deletions

View File

@ -263,12 +263,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int flowFilesBinned = 0;
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final ProcessSession session = sessionFactory.createSession();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
while (binManager.getBinCount() <= maxBinCount) {
if (!isScheduled()) {
break;
}
final ProcessSession session = sessionFactory.createSession();
final List<FlowFile> flowFiles = session.get(1000);
if (flowFiles.isEmpty()) {
break;
@ -284,7 +286,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
} catch (final Exception e) {
getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
continue;
session.commitAsync();
}
}

View File

@ -555,10 +555,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private synchronized void commit(final boolean asynchronous) {
checkpoint(this.checkpoint != null); // If a checkpoint already exists, we need to copy the collection
commit(this.checkpoint, asynchronous);
acknowledgeRecords();
resetState();
this.checkpoint = null;
}
@ -720,6 +716,11 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
}
// Acknowledge records in order to update counts for incoming connections' queues
acknowledgeRecords();
// Reset the internal state, now that the session has been committed
resetState();
} catch (final Exception e) {
LOG.error("Failed to commit session {}. Will roll back.", this, e);

View File

@ -641,6 +641,8 @@ public class MergeContent extends BinFiles {
out.write(header);
}
final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR);
boolean isFirst = true;
final Iterator<FlowFile> itr = contents.iterator();
while (itr.hasNext()) {
@ -653,7 +655,6 @@ public class MergeContent extends BinFiles {
});
if (itr.hasNext()) {
final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR);
if (demarcator != null) {
out.write(demarcator);
}

View File

@ -37,6 +37,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext;
@ -821,4 +822,8 @@ public class StandardStatelessFlow implements StatelessDataflow {
return schedulingUnit;
}
}
public ContentRepository getContentRepository() {
return repositoryContextFactory.getContentRepository();
}
}

View File

@ -394,4 +394,14 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
public void drainTo(final List<FlowFileRecord> destination) {
this.flowFiles.drainTo(destination);
}
@Override
public int hashCode() {
return identifier.hashCode();
}
@Override
public boolean equals(final Object obj) {
return this == obj;
}
}

View File

@ -304,6 +304,16 @@ public class ByteArrayContentRepository implements ContentRepository {
public InputStream read() {
return resourceClaim.read();
}
@Override
public int hashCode() {
return resourceClaim.hashCode();
}
@Override
public boolean equals(final Object obj) {
return this == obj;
}
}
private static class ByteArrayResourceClaim implements ResourceClaim {
@ -379,7 +389,7 @@ public class ByteArrayContentRepository implements ContentRepository {
@Override
public int hashCode() {
return Objects.hash(id);
return id.hashCode();
}
}
}

View File

@ -35,10 +35,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestStatelessFileSystemContentRepository {
private final File repoDirectory = new File("target/test-stateless-file-system-repository");