NIFI-273: Moved getAvailableRelationships from ProcessSession to ProcessContext

This commit is contained in:
Mark Payne 2015-01-16 11:27:22 -05:00
parent 94a06fc5d5
commit 3a7b8de0e3
20 changed files with 114 additions and 59 deletions

View File

@ -348,7 +348,7 @@ public class StandardFunnel implements Funnel {
private void onTrigger(final ProcessContext context, final ProcessSession session) {
readLock.lock();
try {
Set<Relationship> available = session.getAvailableRelationships();
Set<Relationship> available = context.getAvailableRelationships();
int transferred = 0;
while (!available.isEmpty()) {
final List<FlowFile> flowFiles = session.get(10);
@ -359,7 +359,7 @@ public class StandardFunnel implements Funnel {
transferred += flowFiles.size();
session.transfer(flowFiles, Relationship.ANONYMOUS);
session.commit();
available = session.getAvailableRelationships();
available = context.getAvailableRelationships();
}
if (transferred == 0) {

View File

@ -97,11 +97,6 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
return session.getQueueSize();
}
@Override
public Set<Relationship> getAvailableRelationships() {
return session.getAvailableRelationships();
}
@Override
public FlowFile create() {
return session.create();

View File

@ -1236,11 +1236,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return new QueueSize(flowFileCount, byteCount);
}
@Override
public Set<Relationship> getAvailableRelationships() {
return context.getAvailableRelationships();
}
@Override
public FlowFile create() {
final Map<String, String> attrs = new HashMap<>();

View File

@ -16,13 +16,18 @@
*/
package org.apache.nifi.controller.scheduling;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.encrypt.StringEncryptor;
@ -30,6 +35,7 @@ import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
/**
@ -170,4 +176,19 @@ public class ConnectableProcessContext implements ProcessContext {
public ControllerServiceLookup getControllerServiceLookup() {
return null;
}
@Override
public Set<Relationship> getAvailableRelationships() {
for ( final Connection connection : connectable.getConnections() ) {
if ( connection.getFlowFileQueue().isFull() ) {
return Collections.emptySet();
}
}
final Collection<Relationship> relationships = connectable.getRelationships();
if ( relationships instanceof Set ) {
return (Set<Relationship>) relationships;
}
return new HashSet<>(connectable.getRelationships());
}
}

View File

@ -16,7 +16,9 @@
*/
package org.apache.nifi.processor;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -24,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
@ -142,4 +145,29 @@ public class StandardProcessContext implements ProcessContext, ControllerService
public ControllerServiceLookup getControllerServiceLookup() {
return this;
}
@Override
public Set<Relationship> getAvailableRelationships() {
final Set<Relationship> set = new HashSet<>();
for (final Relationship relationship : procNode.getRelationships()) {
final Collection<Connection> connections = procNode.getConnections(relationship);
if (connections.isEmpty()) {
set.add(relationship);
} else {
boolean available = true;
for (final Connection connection : connections) {
if (connection.getFlowFileQueue().isFull()) {
available = false;
}
}
if (available) {
set.add(relationship);
}
}
}
return set;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processor;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -104,4 +105,9 @@ public class StandardSchedulingContext implements SchedulingContext {
public ControllerServiceLookup getControllerServiceLookup() {
return processContext.getControllerServiceLookup();
}
@Override
public Set<Relationship> getAvailableRelationships() {
return processContext.getAvailableRelationships();
}
}

View File

@ -327,7 +327,7 @@ public class SocketClientProtocol implements ClientProtocol {
// Commit the session so that we have persisted the data
session.commit();
if ( session.getAvailableRelationships().isEmpty() ) {
if ( context.getAvailableRelationships().isEmpty() ) {
// Confirm that we received the data and the peer can now discard it but that the peer should not
// send any more data for a bit
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);

View File

@ -482,7 +482,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
// Commit the session so that we have persisted the data
session.commit();
if ( session.getAvailableRelationships().isEmpty() ) {
if ( context.getAvailableRelationships().isEmpty() ) {
// Confirm that we received the data and the peer can now discard it but that the peer should not
// send any more data for a bit
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);

View File

@ -354,14 +354,14 @@ public class DistributeLoad extends AbstractProcessor {
}
final DistributionStrategy strategy = strategyRef.get();
final Set<Relationship> available = session.getAvailableRelationships();
final Set<Relationship> available = context.getAvailableRelationships();
final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
final boolean allDestinationsAvailable = (available.size() == numRelationships);
if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) {
return;
}
final Relationship relationship = strategy.mapToRelationship(session, flowFile);
final Relationship relationship = strategy.mapToRelationship(context, flowFile);
if (relationship == null) {
// can't transfer the FlowFiles. Roll back and yield
session.rollback();
@ -403,7 +403,7 @@ public class DistributeLoad extends AbstractProcessor {
* @param flowFiles
* @return
*/
Relationship mapToRelationship(ProcessSession session, FlowFile flowFile);
Relationship mapToRelationship(ProcessContext context, FlowFile flowFile);
boolean requiresAllDestinationsAvailable();
}
@ -413,7 +413,7 @@ public class DistributeLoad extends AbstractProcessor {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final int numRelationships = relationshipList.size();
@ -427,7 +427,7 @@ public class DistributeLoad extends AbstractProcessor {
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % numRelationships);
relationship = relationshipList.get(idx);
foundFreeRelationship = session.getAvailableRelationships().contains(relationship);
foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
return null;
}
@ -448,7 +448,7 @@ public class DistributeLoad extends AbstractProcessor {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % relationshipList.size());
@ -467,7 +467,7 @@ public class DistributeLoad extends AbstractProcessor {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final int numRelationships = relationshipList.size();
@ -481,7 +481,7 @@ public class DistributeLoad extends AbstractProcessor {
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % numRelationships);
relationship = relationshipList.get(idx);
foundFreeRelationship = session.getAvailableRelationships().contains(relationship);
foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
return null;
}

View File

@ -117,6 +117,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder";
public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
@ -240,6 +241,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);

View File

@ -377,7 +377,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
// this will throttle the processing of the received datagrams. If there are no more
// buffers to read into because none have been returned to the pool via consumer.process(),
// then the desired back pressure on the channel is created.
if (session.getAvailableRelationships().size() > 0) {
if (context.getAvailableRelationships().size() > 0) {
consumer.process();
if (flowFileCount == newFlowFiles.size()) {
// no new datagrams received, need to throttle this thread back so it does

View File

@ -148,7 +148,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
session.transfer(flowFile, conflictResult.getRelationship());
session.commit();
} while (isScheduled() && (getRelationships().size() == session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
} while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
} catch (final IOException e) {
context.yield();
logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});

View File

@ -48,6 +48,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.io.OutputStreamCallback;
@ -87,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet {
private ProcessorLog logger;
private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
private volatile ProcessContext processContext;
private Pattern authorizedPattern;
private Pattern headerPattern;
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
@ -103,6 +105,7 @@ public class ListenHTTPServlet extends HttpServlet {
final ServletContext context = config.getServletContext();
this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER);
this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
@ -118,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet {
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ProcessContext context = processContext;
ProcessSessionFactory sessionFactory;
do {
sessionFactory = sessionFactoryHolder.get();
@ -136,7 +141,7 @@ public class ListenHTTPServlet extends HttpServlet {
try {
final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
if (n == 0 || !spaceAvailable.get()) {
if (session.getAvailableRelationships().isEmpty()) {
if (context.getAvailableRelationships().isEmpty()) {
spaceAvailable.set(false);
if (logger.isDebugEnabled()) {
logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable");

View File

@ -132,6 +132,7 @@ public class TestDistributeLoad {
testRunner.assertQueueEmpty();
for (int i = 1; i <= 100; i++) {
System.out.println(i);
testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processor;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -121,4 +122,11 @@ public interface ProcessContext {
* @return
*/
ControllerServiceLookup getControllerServiceLookup();
/**
* @return the set of all relationships for which space is available to
* receive new objects
*/
Set<Relationship> getAvailableRelationships();
}

View File

@ -165,12 +165,6 @@ public interface ProcessSession {
*/
QueueSize getQueueSize();
/**
* @return the set of all relationships for which space is available to
* receive new objects
*/
Set<Relationship> getAvailableRelationships();
/**
* Creates a new FlowFile in the repository with no content and without any
* linkage to a parent FlowFile. This method is appropriate only when data

View File

@ -21,10 +21,12 @@ import static java.util.Objects.requireNonNull;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
@ -32,6 +34,8 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
import org.junit.Assert;
@ -45,6 +49,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true;
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
/**
* Creates a new MockProcessContext for the given Processor
*
@ -258,4 +264,21 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
public void leaseControllerService(final String identifier) {
}
public Set<Relationship> getAvailableRelationships() {
if ( !(component instanceof Processor) ) {
return Collections.emptySet();
}
final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
relationships.removeAll(unavailableRelationships);
return relationships;
}
public void setUnavailableRelationships(final Set<Relationship> relationships) {
this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
}
public Set<Relationship> getUnavailableRelationships() {
return unavailableRelationships;
}
}

View File

@ -329,10 +329,6 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
@Override
public Set<Relationship> getAvailableRelationships() {
return sharedState.getAvailableRelationships();
}
@Override
public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {

View File

@ -16,49 +16,30 @@
*/
package org.apache.nifi.util;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceReporter;
public class SharedSessionState {
private final MockFlowFileQueue flowFileQueue;
private final ProvenanceReporter provenanceReporter;
@SuppressWarnings("unused")
private final Processor processor;
private final AtomicLong flowFileIdGenerator;
private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
private volatile Set<Relationship> unavailableRelationships;
public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) {
flowFileQueue = new MockFlowFileQueue();
provenanceReporter = new MockProvenanceReporter();
unavailableRelationships = new HashSet<>();
this.flowFileIdGenerator = flowFileIdGenerator;
this.processor = processor;
}
public Set<Relationship> getAvailableRelationships() {
final Set<Relationship> relationships = new HashSet<>(processor.getRelationships());
relationships.removeAll(unavailableRelationships);
return relationships;
}
public void setUnavailableRelationships(final Set<Relationship> relationships) {
this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
}
public Set<Relationship> getUnavailableRelationships() {
return unavailableRelationships;
}
public MockFlowFileQueue getFlowFileQueue() {
return flowFileQueue;
}

View File

@ -426,9 +426,9 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void setRelationshipAvailable(final Relationship relationship) {
final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships());
final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships());
unavailable.remove(relationship);
sharedState.setUnavailableRelationships(unavailable);
context.setUnavailableRelationships(unavailable);
}
@Override
@ -438,9 +438,9 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void setRelationshipUnavailable(final Relationship relationship) {
final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships());
final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships());
unavailable.add(relationship);
sharedState.setUnavailableRelationships(unavailable);
context.setUnavailableRelationships(unavailable);
}
@Override