NIFI-13929 Fixed Provenance Event Handling for Stateless Engine (#9446)

Removed Provenance Repository from the stateless RepositoryContextFactory and added it to the DataflowTriggerContext. This was necessary because the previous design overlooked the possibility of many threads concurrently running the same dataflow. They all shared the same StatelessProvenanceRepository, but the code was designed as if only a single thread would be using the repository. As a result, the events that were registered with the stateless prov repo were being copied many times into NiFi's underlying provenance repository. This refactoring also led to the discovery of some old Java 8 syntax that could be cleaned up, and it led to the discovery of some methods that were no longer being used and could be cleaned up. Finally, in testing, I found that when a Stateless Group was scheduled, it scheduled the triggering of the stateless group before marking the state as RUNNING; as a result, the second thread could run, determine that the state is STARTING instead of RUNNING, and return without triggering the stateless group. This was addressed by ensuring that we set the state to RUNNING before triggering the stateless group to be triggered.

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-10-26 10:23:50 -04:00 committed by GitHub
parent 3b3e74d46b
commit 6d6adfeaeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 465 additions and 154 deletions

View File

@ -217,23 +217,15 @@ public abstract class AbstractRepositoryContext implements RepositoryContext {
}
protected String getProvenanceComponentDescription() {
switch (connectable.getConnectableType()) {
case PROCESSOR:
final ProcessorNode procNode = (ProcessorNode) connectable;
return procNode.getComponentType();
case INPUT_PORT:
return "Input Port";
case OUTPUT_PORT:
return "Output Port";
case REMOTE_INPUT_PORT:
return ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE;
case REMOTE_OUTPUT_PORT:
return ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE;
case FUNNEL:
return "Funnel";
default:
throw new AssertionError("Connectable type is " + connectable.getConnectableType());
}
return switch (connectable.getConnectableType()) {
case PROCESSOR -> ((ProcessorNode) connectable).getComponentType();
case INPUT_PORT -> "Input Port";
case OUTPUT_PORT -> "Output Port";
case REMOTE_INPUT_PORT -> ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE;
case REMOTE_OUTPUT_PORT -> ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE;
case FUNNEL -> "Funnel";
default -> throw new AssertionError("Connectable type is " + connectable.getConnectableType());
};
}
@Override

View File

@ -56,7 +56,6 @@ import org.apache.nifi.logging.LoggingContext;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
@ -239,7 +238,7 @@ public class StandardStatelessGroupNodeFactory implements StatelessGroupNodeFact
.extensionRepository(extensionRepository)
.flowFileEventRepository(flowFileEventRepository)
.processScheduler(statelessScheduler)
.provenanceRepository((ProvenanceRepository) statelessRepositoryContextFactory.getProvenanceRepository())
.provenanceRepository(flowController.getProvenanceRepository())
.stateManagerProvider(stateManagerProvider)
.kerberosConfiguration(kerberosConfig)
.statusTaskInterval(null)

View File

@ -53,6 +53,7 @@ import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.FlowFileSupplier;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
import java.io.IOException;
import java.util.ArrayList;
@ -83,7 +84,6 @@ public class StatelessFlowTask {
private final boolean allowBatch;
// State that is updated during invocation - these variables are guarded by synchronized block
private Long maxProvenanceEventId;
private List<FlowFileCloneResult> cloneResults;
private List<RepositoryRecord> outputRepositoryRecords;
private List<ProvenanceEventRecord> cloneProvenanceEvents;
@ -167,8 +167,7 @@ public class StatelessFlowTask {
final List<Invocation> allInvocations = new ArrayList<>();
final List<Invocation> successfulInvocations = new ArrayList<>();
final ProvenanceEventRepository statelessProvRepo = flow.getProvenanceRepository();
maxProvenanceEventId = statelessProvRepo.getMaxEventId();
final ProvenanceEventRepository statelessProvRepo = new StatelessProvenanceRepository(10_000);
try {
int invocationCount = 0;
@ -177,7 +176,7 @@ public class StatelessFlowTask {
final Invocation invocation = new Invocation();
final FlowFileSupplier flowFileSupplier = new BridgingFlowFileSupplier(invocation);
final DataflowTriggerContext triggerContext = new StatelessFlowTaskTriggerContext(flowFileSupplier);
final DataflowTriggerContext triggerContext = new StatelessFlowTaskTriggerContext(flowFileSupplier, statelessProvRepo);
final TriggerResult triggerResult = triggerFlow(triggerContext);
invocation.setTriggerResult(triggerResult);
@ -196,7 +195,7 @@ public class StatelessFlowTask {
}
} else {
logger.debug("Failed to trigger", triggerResult.getFailureCause().orElse(null));
fail(invocation);
fail(invocation, statelessProvRepo);
break;
}
}
@ -204,11 +203,11 @@ public class StatelessFlowTask {
logger.debug("Finished triggering");
} finally {
try {
completeInvocations(successfulInvocations);
completeInvocations(successfulInvocations, statelessProvRepo);
} catch (final Exception e) {
logger.error("Failed to complete Stateless Flow", e);
statelessGroupNode.yield();
fail(successfulInvocations, e);
fail(successfulInvocations, statelessProvRepo, e);
}
logger.debug("Acknowledging FlowFiles from {} invocations", allInvocations.size());
@ -221,11 +220,11 @@ public class StatelessFlowTask {
}
private void fail(final List<Invocation> invocations, final Throwable cause) {
invocations.forEach(invocation -> fail(invocation, cause));
private void fail(final List<Invocation> invocations, final ProvenanceEventRepository statelessProvRepo, final Throwable cause) {
invocations.forEach(invocation -> fail(invocation, statelessProvRepo, cause));
}
private void fail(final Invocation invocation) {
private void fail(final Invocation invocation, final ProvenanceEventRepository statelessProvRepo) {
final Throwable cause;
if (invocation.getTriggerResult().isCanceled()) {
cause = new TerminatedTaskException();
@ -233,14 +232,14 @@ public class StatelessFlowTask {
cause = invocation.getTriggerResult().getFailureCause().orElse(null);
}
fail(invocation, cause);
fail(invocation, statelessProvRepo, cause);
}
private void fail(final Invocation invocation, final Throwable cause) {
private void fail(final Invocation invocation, final ProvenanceEventRepository statelessProvRepo, final Throwable cause) {
final Port destinationPort = getDestinationPort(cause);
try {
failInvocation(invocation, destinationPort, cause);
failInvocation(invocation, statelessProvRepo, destinationPort, cause);
} catch (final Exception e) {
if (cause != null) {
cause.addSuppressed(e);
@ -251,11 +250,10 @@ public class StatelessFlowTask {
}
private Port getDestinationPort(final Throwable failureCause) {
if (!(failureCause instanceof FailurePortEncounteredException)) {
if (!(failureCause instanceof final FailurePortEncounteredException fpee)) {
return null;
}
final FailurePortEncounteredException fpee = (FailurePortEncounteredException) failureCause;
final Port port = this.outputPorts.get(fpee.getPortName());
if (port == null) {
logger.error("FlowFile was routed to Failure Port {} but no such port exists in the dataflow", fpee.getPortName());
@ -283,7 +281,7 @@ public class StatelessFlowTask {
}
private void completeInvocations(final List<Invocation> invocations) throws IOException {
private void completeInvocations(final List<Invocation> invocations, final ProvenanceEventRepository statelessProvRepo) throws IOException {
logger.debug("Completing transactions from {} invocations", invocations.size());
if (invocations.isEmpty()) {
return;
@ -318,7 +316,7 @@ public class StatelessFlowTask {
throw new IOException("Failed to update FlowFile Repository after triggering " + this, e);
}
updateProvenanceRepository(event -> true);
updateProvenanceRepository(statelessProvRepo, event -> true);
// Acknowledge the invocations so that the sessions can be committed
for (final Invocation invocation : invocations) {
@ -337,7 +335,7 @@ public class StatelessFlowTask {
cloneProvenanceEvents = new ArrayList<>();
}
private void failInvocation(final Invocation invocation, final Port destinationPort, final Throwable cause) throws IOException {
private void failInvocation(final Invocation invocation, final ProvenanceEventRepository statelessProvRepo, final Port destinationPort, final Throwable cause) throws IOException {
final List<PolledFlowFile> inputFlowFiles = invocation.getPolledFlowFiles();
boolean stopped = false;
@ -401,7 +399,7 @@ public class StatelessFlowTask {
throw new IOException("Failed to update FlowFile Repository after triggering " + this, e);
}
updateProvenanceRepository(event -> eventTypesToKeepOnFailure.contains(event.getEventType()));
updateProvenanceRepository(statelessProvRepo, event -> eventTypesToKeepOnFailure.contains(event.getEventType()));
// Acknowledge the invocations so that the sessions can be committed
abort(invocation, cause);
@ -474,9 +472,8 @@ public class StatelessFlowTask {
}
}
void updateProvenanceRepository(final Predicate<ProvenanceEventRecord> eventFilter) {
long firstProvEventId = (maxProvenanceEventId == null) ? 0 : (maxProvenanceEventId + 1);
final ProvenanceEventRepository statelessProvRepo = flow.getProvenanceRepository();
void updateProvenanceRepository(final ProvenanceEventRepository statelessRepo, final Predicate<ProvenanceEventRecord> eventFilter) {
long firstProvEventId = 0;
if (!cloneProvenanceEvents.isEmpty()) {
nifiProvenanceEventRepository.registerEvents(cloneProvenanceEvents);
@ -484,7 +481,7 @@ public class StatelessFlowTask {
while (true) {
try {
final List<ProvenanceEventRecord> statelessProvEvents = statelessProvRepo.getEvents(firstProvEventId, 1000);
final List<ProvenanceEventRecord> statelessProvEvents = statelessRepo.getEvents(firstProvEventId, 1000);
if (statelessProvEvents.isEmpty()) {
return;
}
@ -494,7 +491,7 @@ public class StatelessFlowTask {
// copy the Event ID.
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>();
for (final ProvenanceEventRecord eventRecord : statelessProvEvents) {
if (eventFilter.test(eventRecord) == false) {
if (!eventFilter.test(eventRecord)) {
continue;
}
@ -731,7 +728,7 @@ public class StatelessFlowTask {
public List<PolledFlowFile> getPolledFlowFiles() {
if (polledFlowFiles == null) {
return Collections.emptyList();
return List.of();
}
return polledFlowFiles;
@ -880,9 +877,11 @@ public class StatelessFlowTask {
private class StatelessFlowTaskTriggerContext implements DataflowTriggerContext {
private final FlowFileSupplier flowFileSupplier;
private final ProvenanceEventRepository statelessProvRepo;
public StatelessFlowTaskTriggerContext(final FlowFileSupplier flowFileSupplier) {
public StatelessFlowTaskTriggerContext(final FlowFileSupplier flowFileSupplier, final ProvenanceEventRepository statelessProvRepo) {
this.flowFileSupplier = flowFileSupplier;
this.statelessProvRepo = statelessProvRepo;
}
@Override
@ -895,5 +894,9 @@ public class StatelessFlowTask {
return flowFileSupplier;
}
@Override
public ProvenanceEventRepository getProvenanceEventRepository() {
return statelessProvRepo;
}
}
}

View File

@ -255,9 +255,9 @@ public class StandardStatelessGroupNode implements StatelessGroupNode {
writeLock.lock();
try {
if (desiredState == ScheduledState.RUNNING) {
schedulingAgentCallback.trigger();
logger.info("{} has been started", this);
currentState = ScheduledState.RUNNING;
schedulingAgentCallback.trigger();
} else {
logger.info("{} completed setup but is no longer scheduled to run; desired state is now {}; will shutdown", this, desiredState);
shutdown = true;

View File

@ -89,6 +89,7 @@ public class TestStatelessFlowTask {
private List<ProvenanceEventRecord> registeredProvenanceEvents;
private List<ProvenanceEventRecord> statelessProvenanceEvents;
private Map<String, StandardFlowFileEvent> flowFileEventsByComponentId;
private ProvenanceEventRepository statelessProvRepo;
@BeforeEach
public void setup() throws IOException {
@ -110,7 +111,7 @@ public class TestStatelessFlowTask {
rootGroup.addOutputPort(secondOutputPort);
statelessProvenanceEvents = new ArrayList<>();
final ProvenanceEventRepository statelessProvRepo = mock(ProvenanceEventRepository.class);
statelessProvRepo = mock(ProvenanceEventRepository.class);
doAnswer(invocation -> statelessProvenanceEvents.size() - 1).when(statelessProvRepo).getMaxEventId();
doAnswer(invocation -> {
final long startEventId = invocation.getArgument(0, Long.class);
@ -123,7 +124,6 @@ public class TestStatelessFlowTask {
}).when(statelessProvRepo).getEvents(anyLong(), anyInt());
final StatelessDataflow statelessFlow = mock(StatelessDataflow.class);
when(statelessFlow.getProvenanceRepository()).thenReturn(statelessProvRepo);
final StatelessGroupNode statelessGroupNode = mock(StatelessGroupNode.class);
when(statelessGroupNode.getProcessGroup()).thenReturn(rootGroup);
@ -423,7 +423,7 @@ public class TestStatelessFlowTask {
statelessProvenanceEvents.add(event);
}
task.updateProvenanceRepository(event -> true);
task.updateProvenanceRepository(statelessProvRepo, event -> true);
assertEquals(statelessProvenanceEvents, registeredProvenanceEvents);
for (final ProvenanceEventRecord eventRecord : registeredProvenanceEvents) {
@ -446,7 +446,7 @@ public class TestStatelessFlowTask {
statelessProvenanceEvents.add(event);
}
task.updateProvenanceRepository(event -> true);
task.updateProvenanceRepository(statelessProvRepo, event -> true);
assertEquals(statelessProvenanceEvents, registeredProvenanceEvents);
for (final ProvenanceEventRecord eventRecord : registeredProvenanceEvents) {

View File

@ -17,7 +17,10 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.provenance.ProvenanceEventRepository;
public interface DataflowTriggerContext {
/**
* Provides a mechanism by which the triggering class can abort a dataflow
* @return <code>true</code> if the dataflow should be aborted, <code>false</code> otherwise
@ -28,13 +31,22 @@ public interface DataflowTriggerContext {
return null;
}
ProvenanceEventRepository getProvenanceEventRepository();
/**
* The implicit context that will be used if no other context is provided when triggering a dataflow
*/
DataflowTriggerContext IMPLICIT_CONTEXT = new DataflowTriggerContext() {
private final ProvenanceEventRepository eventRepo = new NopProvenanceEventRepository();
@Override
public boolean isAbort() {
return false;
}
@Override
public ProvenanceEventRepository getProvenanceEventRepository() {
return eventRepo;
}
};
}

View File

@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import java.util.List;
import java.util.Map;
class NopProvenanceEventRepository implements ProvenanceEventRepository {
@Override
public ProvenanceEventBuilder eventBuilder() {
return new EventBuilder();
}
@Override
public void registerEvent(final ProvenanceEventRecord event) {
}
@Override
public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
}
@Override
public List<ProvenanceEventRecord> getEvents(final long minId, final int maxEvents) {
return List.of();
}
@Override
public Long getMaxEventId() {
return 0L;
}
@Override
public ProvenanceEventRecord getEvent(final long eventId) {
return null;
}
@Override
public void close() {
}
private static class EventBuilder implements ProvenanceEventBuilder {
@Override
public ProvenanceEventBuilder setEventType(final ProvenanceEventType provenanceEventType) {
return this;
}
@Override
public ProvenanceEventBuilder fromEvent(final ProvenanceEventRecord provenanceEventRecord) {
return this;
}
@Override
public ProvenanceEventBuilder setFlowFileEntryDate(final long l) {
return this;
}
@Override
public ProvenanceEventBuilder setPreviousContentClaim(final String s, final String s1, final String s2, final Long aLong, final long l) {
return this;
}
@Override
public ProvenanceEventBuilder setCurrentContentClaim(final String s, final String s1, final String s2, final Long aLong, final long l) {
return this;
}
@Override
public ProvenanceEventBuilder setSourceQueueIdentifier(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setAttributes(final Map<String, String> map, final Map<String, String> map1) {
return this;
}
@Override
public ProvenanceEventBuilder setFlowFileUUID(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setEventTime(final long l) {
return this;
}
@Override
public ProvenanceEventBuilder setEventDuration(final long l) {
return this;
}
@Override
public ProvenanceEventBuilder setLineageStartDate(final long l) {
return this;
}
@Override
public ProvenanceEventBuilder setComponentId(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setComponentType(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setSourceSystemFlowFileIdentifier(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setTransitUri(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder addParentFlowFile(final FlowFile flowFile) {
return this;
}
@Override
public ProvenanceEventBuilder removeParentFlowFile(final FlowFile flowFile) {
return this;
}
@Override
public ProvenanceEventBuilder addChildFlowFile(final FlowFile flowFile) {
return this;
}
@Override
public ProvenanceEventBuilder addChildFlowFile(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder removeChildFlowFile(final FlowFile flowFile) {
return this;
}
@Override
public ProvenanceEventBuilder setAlternateIdentifierUri(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setDetails(final String s) {
return this;
}
@Override
public ProvenanceEventBuilder setRelationship(final Relationship relationship) {
return this;
}
@Override
public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
return this;
}
@Override
public ProvenanceEventRecord build() {
return new NopProvenanceEventRecord();
}
@Override
public List<String> getChildFlowFileIds() {
return List.of();
}
@Override
public List<String> getParentFlowFileIds() {
return List.of();
}
@Override
public String getFlowFileId() {
return "";
}
@Override
public ProvenanceEventBuilder copy() {
return this;
}
}
private static class NopProvenanceEventRecord implements ProvenanceEventRecord {
@Override
public long getEventId() {
return 0;
}
@Override
public long getEventTime() {
return 0;
}
@Override
public long getFlowFileEntryDate() {
return 0;
}
@Override
public long getLineageStartDate() {
return 0;
}
@Override
public long getFileSize() {
return 0;
}
@Override
public Long getPreviousFileSize() {
return 0L;
}
@Override
public long getEventDuration() {
return 0;
}
@Override
public ProvenanceEventType getEventType() {
return ProvenanceEventType.UNKNOWN;
}
@Override
public Map<String, String> getAttributes() {
return Map.of();
}
@Override
public Map<String, String> getPreviousAttributes() {
return Map.of();
}
@Override
public Map<String, String> getUpdatedAttributes() {
return Map.of();
}
@Override
public String getComponentId() {
return "";
}
@Override
public String getComponentType() {
return "";
}
@Override
public String getTransitUri() {
return "";
}
@Override
public String getSourceSystemFlowFileIdentifier() {
return "";
}
@Override
public String getFlowFileUuid() {
return "";
}
@Override
public List<String> getParentUuids() {
return List.of();
}
@Override
public List<String> getChildUuids() {
return List.of();
}
@Override
public String getAlternateIdentifierUri() {
return "";
}
@Override
public String getDetails() {
return "";
}
@Override
public String getRelationship() {
return "";
}
@Override
public String getSourceQueueIdentifier() {
return "";
}
@Override
public String getContentClaimSection() {
return "";
}
@Override
public String getPreviousContentClaimSection() {
return "";
}
@Override
public String getContentClaimContainer() {
return "";
}
@Override
public String getPreviousContentClaimContainer() {
return "";
}
@Override
public String getContentClaimIdentifier() {
return "";
}
@Override
public String getPreviousContentClaimIdentifier() {
return "";
}
@Override
public Long getContentClaimOffset() {
return 0L;
}
@Override
public Long getPreviousContentClaimOffset() {
return 0L;
}
@Override
public String getBestEventIdentifier() {
return "";
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.reporting.BulletinRepository;
import java.io.InputStream;
@ -102,11 +101,6 @@ public interface StatelessDataflow {
long getSourceYieldExpiration();
void resetCounters();
Map<String, Long> getCounters(boolean includeGlobalContext);
BulletinRepository getBulletinRepository();
ProvenanceEventRepository getProvenanceRepository();
}

View File

@ -18,7 +18,6 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import java.io.IOException;
import java.io.InputStream;
@ -84,9 +83,4 @@ public interface TriggerResult {
*/
void abort(Throwable cause);
/**
* Returns all Provenance Events that were created during this invocation of the dataflow
* @return the list of Provenance events
*/
List<ProvenanceEventRecord> getProvenanceEvents() throws IOException;
}

View File

@ -30,8 +30,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.flow.CanceledTriggerResult;
import org.apache.nifi.stateless.flow.DataflowTriggerContext;
import org.apache.nifi.stateless.flow.ExceptionalTriggerResult;
@ -60,12 +58,10 @@ public class StandardExecutionProgress implements ExecutionProgress {
private final ProcessGroup rootGroup;
private final List<FlowFileQueue> internalFlowFileQueues;
private final ContentRepository contentRepository;
private final ProvenanceEventRepository provenanceRepository;
private final BlockingQueue<TriggerResult> resultQueue;
private final Set<String> failurePortNames;
private final AsynchronousCommitTracker commitTracker;
private final StatelessStateManagerProvider stateManagerProvider;
private final Long maxProvenanceEventId;
private final DataflowTriggerContext triggerContext;
private final FlowPurgeAction purgeAction;
private final List<StatelessProcessSession> createdSessions = new ArrayList<>();
@ -76,17 +72,16 @@ public class StandardExecutionProgress implements ExecutionProgress {
private volatile CompletionAction completionAction = null;
public StandardExecutionProgress(final ProcessGroup rootGroup, final List<FlowFileQueue> internalFlowFileQueues, final BlockingQueue<TriggerResult> resultQueue,
final RepositoryContextFactory repositoryContextFactory, final Set<String> failurePortNames, final AsynchronousCommitTracker commitTracker,
final StatelessStateManagerProvider stateManagerProvider, final DataflowTriggerContext triggerContext, final FlowPurgeAction purgeAction) {
final RepositoryContextFactory repositoryContextFactory, final Set<String> failurePortNames,
final AsynchronousCommitTracker commitTracker, final StatelessStateManagerProvider stateManagerProvider, final DataflowTriggerContext triggerContext,
final FlowPurgeAction purgeAction) {
this.rootGroup = rootGroup;
this.internalFlowFileQueues = internalFlowFileQueues;
this.resultQueue = resultQueue;
this.contentRepository = repositoryContextFactory.getContentRepository();
this.provenanceRepository = repositoryContextFactory.getProvenanceRepository();
this.failurePortNames = failurePortNames;
this.commitTracker = commitTracker;
this.stateManagerProvider = stateManagerProvider;
this.maxProvenanceEventId = provenanceRepository.getMaxEventId();
this.triggerContext = triggerContext;
this.purgeAction = purgeAction;
@ -306,11 +301,6 @@ public class StandardExecutionProgress implements ExecutionProgress {
onFailure.accept(cause);
}
}
@Override
public List<ProvenanceEventRecord> getProvenanceEvents() throws IOException {
return provenanceRepository.getEvents(maxProvenanceEventId == null ? 0 : maxProvenanceEventId + 1, Integer.MAX_VALUE);
}
};
}

View File

@ -18,7 +18,6 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@ -70,9 +69,4 @@ public class CanceledTriggerResult implements TriggerResult {
@Override
public void abort(final Throwable cause) {
}
@Override
public List<ProvenanceEventRecord> getProvenanceEvents() {
return Collections.emptyList();
}
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import java.io.IOException;
import java.io.InputStream;
@ -80,9 +79,4 @@ public class ExceptionalTriggerResult implements TriggerResult {
failureCause.addSuppressed(cause);
}
}
@Override
public List<ProvenanceEventRecord> getProvenanceEvents() throws IOException {
return Collections.emptyList();
}
}

View File

@ -31,14 +31,12 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.RepositoryRecord;
@ -59,7 +57,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
@ -68,6 +65,7 @@ import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.engine.StandardExecutionProgress;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.Connectables;
@ -514,7 +512,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
repositoryContextFactory, dataflowDefinition.getFailurePortNames(), tracker, stateManagerProvider, triggerContext, this::purge);
final Future<?> future = runDataflowExecutor.submit(
() -> executeDataflow(resultQueue, executionProgress, tracker, triggerContext.getFlowFileSupplier()));
() -> executeDataflow(resultQueue, executionProgress, tracker, triggerContext));
final DataflowTrigger trigger = new DataflowTrigger() {
@Override
@ -547,7 +545,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private void executeDataflow(final BlockingQueue<TriggerResult> resultQueue, final ExecutionProgress executionProgress, final AsynchronousCommitTracker tracker,
final FlowFileSupplier flowFileSupplier) {
final DataflowTriggerContext triggerContext) {
final long startNanos = System.nanoTime();
transactionThresholdMeter.reset();
@ -557,7 +555,8 @@ public class StandardStatelessFlow implements StatelessDataflow {
.processContextFactory(processContextFactory)
.repositoryContextFactory(repositoryContextFactory)
.rootConnectables(rootConnectables)
.flowFileSupplier(flowFileSupplier)
.flowFileSupplier(triggerContext.getFlowFileSupplier())
.provenanceEventRepository(triggerContext.getProvenanceEventRepository())
.inputPorts(inputPorts)
.transactionThresholdMeter(transactionThresholdMeter)
.lifecycleStateManager(lifecycleStateManager)
@ -664,7 +663,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
throw new IllegalArgumentException("No Input Port exists with name <" + portName + ">. Valid Port names are " + getInputPortNames());
}
final RepositoryContext repositoryContext = repositoryContextFactory.createRepositoryContext(inputPort);
final RepositoryContext repositoryContext = repositoryContextFactory.createRepositoryContext(inputPort, new StatelessProvenanceRepository(10));
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false, new NopPerformanceTracker());
final ProcessSession session = sessionFactory.createSession();
try {
@ -819,33 +818,6 @@ public class StandardStatelessFlow implements StatelessDataflow {
return latest;
}
@Override
public void resetCounters() {
final CounterRepository counterRepo = repositoryContextFactory.getCounterRepository();
counterRepo.getCounters().forEach(counter -> counterRepo.resetCounter(counter.getIdentifier()));
}
@Override
public Map<String, Long> getCounters(final boolean includeGlobalContext) {
final Map<String, Long> counters = new HashMap<>();
for (final Counter counter : repositoryContextFactory.getCounterRepository().getCounters()) {
// Counter context is either of the format `componentName (componentId)` or `All componentType's` (global context). We only want the
// those of the first type - for individual components - unless includeGlobalContext == true
final boolean isGlobalContext = !counter.getContext().endsWith(")");
if (includeGlobalContext || !isGlobalContext) {
final String counterName = isGlobalContext ? counter.getName() : (counter.getName() + " - " + counter.getContext());
counters.put(counterName, counter.getValue());
}
}
return counters;
}
@Override
public ProvenanceEventRepository getProvenanceRepository() {
return repositoryContextFactory.getProvenanceRepository();
}
@Override
public BulletinRepository getBulletinRepository() {
return bulletinRepository;

View File

@ -31,6 +31,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
@ -53,6 +54,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
private final ExecutionProgress executionProgress;
private final Set<Connectable> rootConnectables;
private final FlowFileSupplier flowFileSupplier;
private final ProvenanceEventRepository provenanceEventRepository;
private final Collection<Port> inputPorts;
private final RepositoryContextFactory repositoryContextFactory;
private final ProcessContextFactory processContextFactory;
@ -65,6 +67,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
this.executionProgress = builder.executionProgress;
this.rootConnectables = builder.rootConnectables;
this.flowFileSupplier = builder.flowFileSupplier;
this.provenanceEventRepository = builder.provenanceEventRepository;
this.inputPorts = builder.inputPorts;
this.repositoryContextFactory = builder.repositoryContextFactory;
this.processContextFactory = builder.processContextFactory;
@ -193,7 +196,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
// during this invocation of its onTrigger method.
tracker.resetProgress();
final StatelessProcessSessionFactory statelessSessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory, processContextFactory,
final StatelessProcessSessionFactory statelessSessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory, provenanceEventRepository, processContextFactory,
executionProgress, false, tracker);
lifecycleState.incrementActiveThreadCount(null);
@ -211,7 +214,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
private NextConnectable triggerWhileReady(final Connectable connectable) {
final LifecycleState lifecycleState = lifecycleStateManager.getOrRegisterLifecycleState(connectable.getIdentifier(), true, false);
final StatelessProcessSessionFactory statelessSessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory, processContextFactory,
final StatelessProcessSessionFactory statelessSessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory, provenanceEventRepository, processContextFactory,
executionProgress, false, tracker);
lifecycleState.incrementActiveThreadCount(null);
@ -305,6 +308,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
private Set<Connectable> rootConnectables;
private Collection<Port> inputPorts;
private FlowFileSupplier flowFileSupplier = null;
private ProvenanceEventRepository provenanceEventRepository;
private RepositoryContextFactory repositoryContextFactory;
private ProcessContextFactory processContextFactory;
private LifecycleStateManager lifecycleStateManager;
@ -315,6 +319,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
Objects.requireNonNull(executionProgress, "Execution Progress must be set");
Objects.requireNonNull(rootConnectables, "Root Conectables must be set");
Objects.requireNonNull(repositoryContextFactory, "Repository Context Factory must be set");
Objects.requireNonNull(provenanceEventRepository, "Provenance Event Repository must be set");
Objects.requireNonNull(processContextFactory, "Process Context Factory must be set");
return new StandardStatelessFlowCurrent(this);
@ -350,6 +355,11 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
return this;
}
public Builder provenanceEventRepository(final ProvenanceEventRepository provenanceEventRepository) {
this.provenanceEventRepository = provenanceEventRepository;
return this;
}
public Builder inputPorts(final Collection<Port> inputPorts) {
this.inputPorts = inputPorts;
return this;

View File

@ -19,14 +19,13 @@ package org.apache.nifi.stateless.repository;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.provenance.ProvenanceEventRepository;
public interface RepositoryContextFactory {
RepositoryContext createRepositoryContext(Connectable connectable);
RepositoryContext createRepositoryContext(Connectable connectable, ProvenanceEventRepository provenanceEventRepository);
ContentRepository getContentRepository();
@ -34,9 +33,5 @@ public interface RepositoryContextFactory {
FlowFileEventRepository getFlowFileEventRepository();
ProvenanceEventRepository getProvenanceRepository();
CounterRepository getCounterRepository();
void shutdown();
}

View File

@ -39,7 +39,6 @@ public class StatelessRepositoryContextFactory implements RepositoryContextFacto
private final FlowFileRepository flowFileRepository;
private final FlowFileEventRepository flowFileEventRepository;
private final CounterRepository counterRepository;
private final ProvenanceEventRepository provenanceEventRepository;
private final StateManagerProvider stateManagerProvider;
public StatelessRepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository,
@ -48,12 +47,11 @@ public class StatelessRepositoryContextFactory implements RepositoryContextFacto
this.flowFileRepository = flowFileRepository;
this.flowFileEventRepository = flowFileEventRepository;
this.counterRepository = counterRepository;
this.provenanceEventRepository = provenanceRepository;
this.stateManagerProvider = stateManagerProvider;
}
@Override
public RepositoryContext createRepositoryContext(final Connectable connectable) {
public RepositoryContext createRepositoryContext(final Connectable connectable, final ProvenanceEventRepository provenanceEventRepository) {
final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier());
return new StatelessRepositoryContext(connectable, new AtomicLong(0L), contentRepository, flowFileRepository,
flowFileEventRepository, counterRepository, provenanceEventRepository, stateManager);
@ -73,16 +71,6 @@ public class StatelessRepositoryContextFactory implements RepositoryContextFacto
return flowFileEventRepository;
}
@Override
public ProvenanceEventRepository getProvenanceRepository() {
return provenanceEventRepository;
}
@Override
public CounterRepository getCounterRepository() {
return counterRepository;
}
@Override
public void shutdown() {
contentRepository.shutdown();
@ -98,11 +86,5 @@ public class StatelessRepositoryContextFactory implements RepositoryContextFacto
} catch (final IOException e) {
logger.warn("Failed to properly shutdown FlowFile Event Repository", e);
}
try {
provenanceEventRepository.close();
} catch (final IOException e) {
logger.warn("Failed to properly shutdown Provenance Repository", e);
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.engine.DataflowAbortedException;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
@ -48,17 +49,20 @@ public class StatelessProcessSession extends StandardProcessSession {
private final Connectable connectable;
private final RepositoryContextFactory repositoryContextFactory;
private final ProcessContextFactory processContextFactory;
private final ProvenanceEventRepository provenanceEventRepository;
private final ExecutionProgress executionProgress;
private final AsynchronousCommitTracker tracker;
private boolean requireSynchronousCommits;
public StatelessProcessSession(final Connectable connectable, final RepositoryContextFactory repositoryContextFactory, final ProcessContextFactory processContextFactory,
public StatelessProcessSession(final Connectable connectable, final RepositoryContextFactory repositoryContextFactory,
final ProvenanceEventRepository provenanceEventRepository, final ProcessContextFactory processContextFactory,
final ExecutionProgress progress, final boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) {
super(repositoryContextFactory.createRepositoryContext(connectable), progress::isCanceled, new NopPerformanceTracker());
super(repositoryContextFactory.createRepositoryContext(connectable, provenanceEventRepository), progress::isCanceled, new NopPerformanceTracker());
this.connectable = connectable;
this.repositoryContextFactory = repositoryContextFactory;
this.provenanceEventRepository = provenanceEventRepository;
this.processContextFactory = processContextFactory;
this.executionProgress = progress;
this.requireSynchronousCommits = requireSynchronousCommits;
@ -245,7 +249,7 @@ public class StatelessProcessSession extends StandardProcessSession {
assertProgressNotCanceled();
final ProcessContext connectableContext = processContextFactory.createProcessContext(connectable);
final ProcessSessionFactory connectableSessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory,
final ProcessSessionFactory connectableSessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory, provenanceEventRepository,
processContextFactory, executionProgress, requireSynchronousCommits, new AsynchronousCommitTracker(tracker.getRootGroup()));
logger.debug("Triggering {}", connectable);

View File

@ -20,6 +20,7 @@ package org.apache.nifi.stateless.session;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
@ -27,15 +28,18 @@ import org.apache.nifi.stateless.repository.RepositoryContextFactory;
public class StatelessProcessSessionFactory implements ProcessSessionFactory {
private final Connectable connectable;
private final RepositoryContextFactory contextFactory;
private final ProvenanceEventRepository provenanceEventRepository;
private final ProcessContextFactory processContextFactory;
private final ExecutionProgress executionProgress;
private final boolean requireSynchronousCommits;
private final AsynchronousCommitTracker tracker;
public StatelessProcessSessionFactory(final Connectable connectable, final RepositoryContextFactory contextFactory, final ProcessContextFactory processContextFactory,
public StatelessProcessSessionFactory(final Connectable connectable, final RepositoryContextFactory contextFactory,
final ProvenanceEventRepository provenanceEventRepository, final ProcessContextFactory processContextFactory,
final ExecutionProgress executionProgress, final boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) {
this.connectable = connectable;
this.contextFactory = contextFactory;
this.provenanceEventRepository = provenanceEventRepository;
this.processContextFactory = processContextFactory;
this.executionProgress = executionProgress;
this.requireSynchronousCommits = requireSynchronousCommits;
@ -44,7 +48,8 @@ public class StatelessProcessSessionFactory implements ProcessSessionFactory {
@Override
public ProcessSession createSession() {
final StatelessProcessSession session = new StatelessProcessSession(connectable, contextFactory, processContextFactory, executionProgress, requireSynchronousCommits, tracker);
final StatelessProcessSession session = new StatelessProcessSession(connectable, contextFactory, provenanceEventRepository, processContextFactory, executionProgress,
requireSynchronousCommits, tracker);
executionProgress.registerCreatedSession(session);
return session;
}

View File

@ -87,7 +87,7 @@ public class StatelessBasicsIT extends NiFiSystemIT {
public void testOrderingIntraSession() throws NiFiClientException, IOException, InterruptedException {
final int batchSize = 100;
statelessGroup = getClientUtil().createProcessGroup("Stateless", "root");
statelessGroup = getClientUtil().createProcessGroup("testOrderingIntraSession", "root");
getClientUtil().markStateless(statelessGroup, "1 min");
final ProcessorEntity generate = getClientUtil().createProcessor(GENERATE_FLOWFILE, statelessGroup.getId());
@ -117,6 +117,7 @@ public class StatelessBasicsIT extends NiFiSystemIT {
getClientUtil().waitForValidProcessor(generate.getId());
getClientUtil().waitForValidProcessor(router.getId());
getClientUtil().waitForValidProcessor(verifyProcessor.getId());
getClientUtil().startProcessGroupComponents(statelessGroup.getId());
waitForQueueCount(outputToTerminate.getId(), batchSize);