diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index e516f209f8..e34e04344f 100644 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -348,7 +348,7 @@ public class StandardFunnel implements Funnel { private void onTrigger(final ProcessContext context, final ProcessSession session) { readLock.lock(); try { - Set available = session.getAvailableRelationships(); + Set available = context.getAvailableRelationships(); int transferred = 0; while (!available.isEmpty()) { final List flowFiles = session.get(10); @@ -359,7 +359,7 @@ public class StandardFunnel implements Funnel { transferred += flowFiles.size(); session.transfer(flowFiles, Relationship.ANONYMOUS); session.commit(); - available = session.getAvailableRelationships(); + available = context.getAvailableRelationships(); } if (transferred == 0) { diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index eae255033c..d5dba82596 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@ -97,11 +97,6 @@ public class BatchingSessionFactory implements ProcessSessionFactory { return session.getQueueSize(); } - @Override - public Set getAvailableRelationships() { - return session.getAvailableRelationships(); - } - @Override public FlowFile create() { return session.create(); diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3d3e854a28..dcb461cf3d 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1236,11 +1236,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return new QueueSize(flowFileCount, byteCount); } - @Override - public Set getAvailableRelationships() { - return context.getAvailableRelationships(); - } - @Override public FlowFile create() { final Map attrs = new HashMap<>(); diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 8c60e4ba82..acb3a01127 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -16,13 +16,18 @@ */ package org.apache.nifi.controller.scheduling; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.encrypt.StringEncryptor; @@ -30,6 +35,7 @@ import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; /** @@ -170,4 +176,19 @@ public class ConnectableProcessContext implements ProcessContext { public ControllerServiceLookup getControllerServiceLookup() { return null; } + + @Override + public Set getAvailableRelationships() { + for ( final Connection connection : connectable.getConnections() ) { + if ( connection.getFlowFileQueue().isFull() ) { + return Collections.emptySet(); + } + } + + final Collection relationships = connectable.getRelationships(); + if ( relationships instanceof Set ) { + return (Set) relationships; + } + return new HashSet<>(connectable.getRelationships()); + } } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 93a8c6bc34..cd0d31c465 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.processor; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -24,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; @@ -142,4 +145,29 @@ public class StandardProcessContext implements ProcessContext, ControllerService public ControllerServiceLookup getControllerServiceLookup() { return this; } + + @Override + public Set getAvailableRelationships() { + final Set set = new HashSet<>(); + for (final Relationship relationship : procNode.getRelationships()) { + final Collection connections = procNode.getConnections(relationship); + if (connections.isEmpty()) { + set.add(relationship); + } else { + boolean available = true; + for (final Connection connection : connections) { + if (connection.getFlowFileQueue().isFull()) { + available = false; + } + } + + if (available) { + set.add(relationship); + } + } + } + + return set; + } + } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 0fe08c99c7..318901fd77 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.processor; import java.util.Map; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -104,4 +105,9 @@ public class StandardSchedulingContext implements SchedulingContext { public ControllerServiceLookup getControllerServiceLookup() { return processContext.getControllerServiceLookup(); } + + @Override + public Set getAvailableRelationships() { + return processContext.getAvailableRelationships(); + } } diff --git a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 22ec983b0f..d4b4f61542 100644 --- a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -327,7 +327,7 @@ public class SocketClientProtocol implements ClientProtocol { // Commit the session so that we have persisted the data session.commit(); - if ( session.getAvailableRelationships().isEmpty() ) { + if ( context.getAvailableRelationships().isEmpty() ) { // Confirm that we received the data and the peer can now discard it but that the peer should not // send any more data for a bit logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); diff --git a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 88b6a416e3..5edd4f9579 100644 --- a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -482,7 +482,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { // Commit the session so that we have persisted the data session.commit(); - if ( session.getAvailableRelationships().isEmpty() ) { + if ( context.getAvailableRelationships().isEmpty() ) { // Confirm that we received the data and the peer can now discard it but that the peer should not // send any more data for a bit logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index a755b1a22f..3ac55d250b 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@ -354,14 +354,14 @@ public class DistributeLoad extends AbstractProcessor { } final DistributionStrategy strategy = strategyRef.get(); - final Set available = session.getAvailableRelationships(); + final Set available = context.getAvailableRelationships(); final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); final boolean allDestinationsAvailable = (available.size() == numRelationships); if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) { return; } - final Relationship relationship = strategy.mapToRelationship(session, flowFile); + final Relationship relationship = strategy.mapToRelationship(context, flowFile); if (relationship == null) { // can't transfer the FlowFiles. Roll back and yield session.rollback(); @@ -403,7 +403,7 @@ public class DistributeLoad extends AbstractProcessor { * @param flowFiles * @return */ - Relationship mapToRelationship(ProcessSession session, FlowFile flowFile); + Relationship mapToRelationship(ProcessContext context, FlowFile flowFile); boolean requiresAllDestinationsAvailable(); } @@ -413,7 +413,7 @@ public class DistributeLoad extends AbstractProcessor { private final AtomicLong counter = new AtomicLong(0L); @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { final List relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); final int numRelationships = relationshipList.size(); @@ -427,7 +427,7 @@ public class DistributeLoad extends AbstractProcessor { final long counterValue = counter.getAndIncrement(); final int idx = (int) (counterValue % numRelationships); relationship = relationshipList.get(idx); - foundFreeRelationship = session.getAvailableRelationships().contains(relationship); + foundFreeRelationship = context.getAvailableRelationships().contains(relationship); if (++attempts % numRelationships == 0 && !foundFreeRelationship) { return null; } @@ -448,7 +448,7 @@ public class DistributeLoad extends AbstractProcessor { private final AtomicLong counter = new AtomicLong(0L); @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { final List relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); final long counterValue = counter.getAndIncrement(); final int idx = (int) (counterValue % relationshipList.size()); @@ -467,7 +467,7 @@ public class DistributeLoad extends AbstractProcessor { private final AtomicLong counter = new AtomicLong(0L); @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { final List relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); final int numRelationships = relationshipList.size(); @@ -481,7 +481,7 @@ public class DistributeLoad extends AbstractProcessor { final long counterValue = counter.getAndIncrement(); final int idx = (int) (counterValue % numRelationships); relationship = relationshipList.get(idx); - foundFreeRelationship = session.getAvailableRelationships().contains(relationship); + foundFreeRelationship = context.getAvailableRelationships().contains(relationship); if (++attempts % numRelationships == 0 && !foundFreeRelationship) { return null; } diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 2b0b437140..b7fe97a80a 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -117,6 +117,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder"; + public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder"; public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern"; public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; @@ -240,6 +241,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger()); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java index 65b3c666fa..43d839552c 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java @@ -377,7 +377,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor { // this will throttle the processing of the received datagrams. If there are no more // buffers to read into because none have been returned to the pool via consumer.process(), // then the desired back pressure on the channel is created. - if (session.getAvailableRelationships().size() > 0) { + if (context.getAvailableRelationships().size() > 0) { consumer.process(); if (flowFileCount == newFlowFiles.size()) { // no new datagrams received, need to throttle this thread back so it does diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 31e5105e9d..da80546032 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -148,7 +148,7 @@ public abstract class PutFileTransfer extends AbstractPr session.transfer(flowFile, conflictResult.getRelationship()); session.commit(); - } while (isScheduled() && (getRelationships().size() == session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null)); + } while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null)); } catch (final IOException e) { context.yield(); logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e}); diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index cae61f0ff3..1cf5f1f887 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -48,6 +48,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamThrottler; import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.io.OutputStreamCallback; @@ -87,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet { private ProcessorLog logger; private AtomicReference sessionFactoryHolder; + private volatile ProcessContext processContext; private Pattern authorizedPattern; private Pattern headerPattern; private ConcurrentMap flowFileMap; @@ -103,6 +105,7 @@ public class ListenHTTPServlet extends HttpServlet { final ServletContext context = config.getServletContext(); this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); this.sessionFactoryHolder = (AtomicReference) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); + this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); this.flowFileMap = (ConcurrentMap) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); @@ -118,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet { @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final ProcessContext context = processContext; + ProcessSessionFactory sessionFactory; do { sessionFactory = sessionFactoryHolder.get(); @@ -136,7 +141,7 @@ public class ListenHTTPServlet extends HttpServlet { try { final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; if (n == 0 || !spaceAvailable.get()) { - if (session.getAvailableRelationships().isEmpty()) { + if (context.getAvailableRelationships().isEmpty()) { spaceAvailable.set(false); if (logger.isDebugEnabled()) { logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable"); diff --git a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java index a6402e4b45..ab4c978495 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java +++ b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java @@ -132,6 +132,7 @@ public class TestDistributeLoad { testRunner.assertQueueEmpty(); for (int i = 1; i <= 100; i++) { + System.out.println(i); testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1); } } diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index 9e044392f7..7fa183f824 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.processor; import java.util.Map; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -121,4 +122,11 @@ public interface ProcessContext { * @return */ ControllerServiceLookup getControllerServiceLookup(); + + /** + * @return the set of all relationships for which space is available to + * receive new objects + */ + Set getAvailableRelationships(); + } diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index 09d1bd21f2..d3de916572 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -165,12 +165,6 @@ public interface ProcessSession { */ QueueSize getQueueSize(); - /** - * @return the set of all relationships for which space is available to - * receive new objects - */ - Set getAvailableRelationships(); - /** * Creates a new FlowFile in the repository with no content and without any * linkage to a parent FlowFile. This method is appropriate only when data diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 6e5f65dcf2..15591d7329 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -21,10 +21,12 @@ import static java.util.Objects.requireNonNull; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; @@ -32,6 +34,8 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SchedulingContext; import org.junit.Assert; @@ -45,6 +49,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean enableExpressionValidation = false; private boolean allowExpressionValidation = true; + private volatile Set unavailableRelationships = new HashSet<>(); + /** * Creates a new MockProcessContext for the given Processor * @@ -258,4 +264,21 @@ public class MockProcessContext extends MockControllerServiceLookup implements S public void leaseControllerService(final String identifier) { } + public Set getAvailableRelationships() { + if ( !(component instanceof Processor) ) { + return Collections.emptySet(); + } + + final Set relationships = new HashSet<>(((Processor) component).getRelationships()); + relationships.removeAll(unavailableRelationships); + return relationships; + } + + public void setUnavailableRelationships(final Set relationships) { + this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); + } + + public Set getUnavailableRelationships() { + return unavailableRelationships; + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 552780cbba..ea55b34dab 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -329,10 +329,6 @@ public class MockProcessSession implements ProcessSession { return newFlowFile; } - @Override - public Set getAvailableRelationships() { - return sharedState.getAvailableRelationships(); - } @Override public MockFlowFile merge(final Collection sources, final FlowFile destination) { diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java index 96bef71868..13a87de417 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java @@ -16,49 +16,30 @@ */ package org.apache.nifi.util; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceReporter; public class SharedSessionState { private final MockFlowFileQueue flowFileQueue; private final ProvenanceReporter provenanceReporter; + @SuppressWarnings("unused") private final Processor processor; private final AtomicLong flowFileIdGenerator; private final ConcurrentMap counterMap = new ConcurrentHashMap<>(); - private volatile Set unavailableRelationships; public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) { flowFileQueue = new MockFlowFileQueue(); provenanceReporter = new MockProvenanceReporter(); - unavailableRelationships = new HashSet<>(); this.flowFileIdGenerator = flowFileIdGenerator; this.processor = processor; } - public Set getAvailableRelationships() { - final Set relationships = new HashSet<>(processor.getRelationships()); - relationships.removeAll(unavailableRelationships); - return relationships; - } - - public void setUnavailableRelationships(final Set relationships) { - this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); - } - - public Set getUnavailableRelationships() { - return unavailableRelationships; - } - public MockFlowFileQueue getFlowFileQueue() { return flowFileQueue; } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 54b611dc77..40d5035c2a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -426,9 +426,9 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void setRelationshipAvailable(final Relationship relationship) { - final Set unavailable = new HashSet<>(sharedState.getUnavailableRelationships()); + final Set unavailable = new HashSet<>(context.getUnavailableRelationships()); unavailable.remove(relationship); - sharedState.setUnavailableRelationships(unavailable); + context.setUnavailableRelationships(unavailable); } @Override @@ -438,9 +438,9 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void setRelationshipUnavailable(final Relationship relationship) { - final Set unavailable = new HashSet<>(sharedState.getUnavailableRelationships()); + final Set unavailable = new HashSet<>(context.getUnavailableRelationships()); unavailable.add(relationship); - sharedState.setUnavailableRelationships(unavailable); + context.setUnavailableRelationships(unavailable); } @Override