From 1373bf672586ba5ddcfa697c45c832ccc79425cb Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 10 Mar 2016 09:19:54 -0500 Subject: [PATCH] NIFI-899 Rewrite of ListenUDP to use new listener framework, includes the following changes: - Adding Network Interface property to AbstractListenEventProcessor and ListenSyslog - Adding sending host and sending port to DatagramChannelDispatcher - Creation of common base class AbstractListenEventBatchingProcessor - Refactor of ListenUDP, ListenTCP, and ListenRELP to all extend from AbstractListenEventBatchingProcessor - Changing DatagramChannelDispatcher, socket handlers, and RELP handler to use offer() when queueing instead of put(), and log an error if the offer failed This closes #266 --- .../AbstractListenEventBatchingProcessor.java | 269 ++++++++ .../listen/AbstractListenEventProcessor.java | 160 ++--- .../util/listen/ListenerProperties.java | 87 +++ .../listen/dispatcher/ChannelDispatcher.java | 8 +- .../dispatcher/DatagramChannelDispatcher.java | 36 +- .../dispatcher/SocketChannelDispatcher.java | 7 +- .../util/listen/event/EventQueue.java | 67 ++ .../listen/event/StandardEventFactory.java | 37 ++ .../util/listen/handler/ChannelHandler.java | 5 +- .../socket/SSLSocketChannelHandler.java | 4 +- .../socket/StandardSocketChannelHandler.java | 4 +- .../nifi/processors/standard/ListenRELP.java | 122 ++-- .../processors/standard/ListenSyslog.java | 15 +- .../nifi/processors/standard/ListenTCP.java | 110 +--- .../nifi/processors/standard/ListenUDP.java | 589 +++--------------- .../relp/handler/RELPFrameHandler.java | 14 +- .../handler/RELPSSLSocketChannelHandler.java | 2 +- .../handler/RELPSocketChannelHandler.java | 2 +- .../processors/standard/TestListenRELP.java | 63 +- .../processors/standard/TestListenSyslog.java | 2 +- .../processors/standard/TestListenUDP.java | 414 +++++++----- .../relp/handler/TestRELPFrameHandler.java | 5 +- .../handler/TestRELPSocketChannelHandler.java | 2 +- 23 files changed, 1048 insertions(+), 976 deletions(-) create mode 100644 nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java create mode 100644 nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java create mode 100644 nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java create mode 100644 nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java new file mode 100644 index 0000000000..9a9767145a --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.listen.event.Event; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * An abstract processor that extends from AbstractListenEventProcessor and adds common functionality for + * batching events into a single FlowFile. + * + * @param the type of Event + */ +public abstract class AbstractListenEventBatchingProcessor extends AbstractListenEventProcessor { + + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Max Batch Size") + .description( + "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with " + + "the up to this configured maximum number of messages") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .required(true) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .displayName("Batching Message Delimiter") + .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see property).") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("\\n") + .required(true) + .build(); + + // it is only the array reference that is volatile - not the contents. + protected volatile byte[] messageDemarcatorBytes; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(NETWORK_INTF_NAME); + descriptors.add(PORT); + descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_MESSAGE_QUEUE_SIZE); + descriptors.add(MAX_SOCKET_BUFFER_SIZE); + descriptors.add(CHARSET); + descriptors.add(MAX_BATCH_SIZE); + descriptors.add(MESSAGE_DELIMITER); + descriptors.addAll(getAdditionalProperties()); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.addAll(getAdditionalRelationships()); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + messageDemarcatorBytes = msgDemarcator.getBytes(charset); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); + final Map batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); + + // if the size is 0 then there was nothing to process so return + // we don't need to yield here because we have a long poll in side of getBatches + if (batches.size() == 0) { + return; + } + + final List allEvents = new ArrayList<>(); + + for (Map.Entry entry : batches.entrySet()) { + FlowFile flowFile = entry.getValue().getFlowFile(); + final List events = entry.getValue().getEvents(); + + if (flowFile.getSize() == 0L || events.size() == 0) { + session.remove(flowFile); + getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); + continue; + } + + final Map attributes = getAttributes(entry.getValue()); + flowFile = session.putAllAttributes(flowFile, attributes); + + getLogger().debug("Transferring {} to success", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("FlowFiles Transferred to Success", 1L, false); + + // the sender and command will be the same for all events based on the batch key + final String transitUri = getTransitUri(entry.getValue()); + session.getProvenanceReporter().receive(flowFile, transitUri); + + allEvents.addAll(events); + } + + // let sub-classes take any additional actions + postProcess(context, session, allEvents); + } + + /** + * Creates the attributes for the FlowFile of the given batch. + * + * @param batch the current batch + * @return the Map of FlowFile attributes + */ + protected abstract Map getAttributes(final FlowFileEventBatch batch); + + /** + * Creates the transit uri to be used when reporting a provenance receive event for the given batch. + * + * @param batch the current batch + * @return the transit uri string + */ + protected abstract String getTransitUri(final FlowFileEventBatch batch); + + /** + * Called at the end of onTrigger to allow sub-classes to take post processing action on the events + * + * @param context the current context + * @param session the current session + * @param events the list of all events processed by the current execution of onTrigger + */ + protected void postProcess(ProcessContext context, ProcessSession session, final List events) { + // empty implementation so sub-classes only have to override if necessary + } + + /** + * Batches together up to the batchSize events. Events are grouped together based on a batch key which + * by default is the sender of the event, but can be override by sub-classes. + * + * This method will return when batchSize has been reached, or when no more events are available on the queue. + * + * @param session the current session + * @param totalBatchSize the total number of events to process + * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile + * + * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all + * the batches will be <= batchSize + */ + protected Map getBatches(final ProcessSession session, final int totalBatchSize, + final byte[] messageDemarcatorBytes) { + + final Map batches = new HashMap<>(); + for (int i=0; i < totalBatchSize; i++) { + final E event = getMessage(true, true, session); + if (event == null) { + break; + } + + final String batchKey = getBatchKey(event); + FlowFileEventBatch batch = batches.get(batchKey); + + // if we don't have a batch for this key then create a new one + if (batch == null) { + batch = new FlowFileEventBatch(session.create(), new ArrayList()); + batches.put(batchKey, batch); + } + + // add the current event to the batch + batch.getEvents().add(event); + + // append the event's data to the FlowFile, write the demarcator first if not on the first event + final boolean writeDemarcator = (i > 0); + try { + final byte[] rawMessage = event.getData(); + FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (writeDemarcator) { + out.write(messageDemarcatorBytes); + } + + out.write(rawMessage); + } + }); + + // update the FlowFile reference in the batch object + batch.setFlowFile(appendedFlowFile); + + } catch (final Exception e) { + getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", + new Object[] {e.getMessage()}, e); + errorEvents.offer(event); + break; + } + } + + return batches; + } + + /** + * @param event an event that was pulled off the queue + * + * @return a key to use for batching events together, by default this uses the sender of the + * event, but sub-classes should override this to batch by something else + */ + protected String getBatchKey(final E event) { + return event.getSender(); + } + + /** + * Wrapper to hold a FlowFile and the events that have been appended to it. + */ + protected final class FlowFileEventBatch { + + private FlowFile flowFile; + private List events; + + public FlowFileEventBatch(final FlowFile flowFile, final List events) { + this.flowFile = flowFile; + this.events = events; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public List getEvents() { + return events; + } + + public void setFlowFile(FlowFile flowFile) { + this.flowFile = flowFile; + } + } + +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index 84fa5dc0f1..43d01b8895 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -16,30 +16,31 @@ */ package org.apache.nifi.processor.util.listen; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; import java.io.IOException; -import java.io.OutputStream; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -55,6 +56,8 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractListenEventProcessor extends AbstractProcessor { + + public static final PropertyDescriptor PORT = new PropertyDescriptor .Builder().name("Port") .description("The port to listen on for communication.") @@ -105,23 +108,7 @@ public abstract class AbstractListenEventProcessor extends Abst .defaultValue("2") .required(true) .build(); - public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Max Batch Size") - .description( - "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with " - + "the up to this configured maximum number of messages") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1") - .required(true) - .build(); - public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see property).") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("\\n") - .required(true) - .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -130,8 +117,8 @@ public abstract class AbstractListenEventProcessor extends Abst public static final int POLL_TIMEOUT_MS = 20; - private Set relationships; - private List descriptors; + protected Set relationships; + protected List descriptors; protected volatile int port; protected volatile Charset charset; @@ -142,6 +129,7 @@ public abstract class AbstractListenEventProcessor extends Abst @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); + descriptors.add(NETWORK_INTF_NAME); descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_MESSAGE_QUEUE_SIZE); @@ -190,11 +178,18 @@ public abstract class AbstractListenEventProcessor extends Abst port = context.getProperty(PORT).asInteger(); events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); + final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + InetAddress nicIPAddress = null; + if (!StringUtils.isEmpty(nicIPAddressStr)) { + NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); + nicIPAddress = netIF.getInetAddresses().nextElement(); + } + // create the dispatcher and call open() to bind to the given port dispatcher = createDispatcher(context, events); - dispatcher.open(port, maxChannelBufferSize); + dispatcher.open(nicIPAddress, port, maxChannelBufferSize); // start a thread to run the dispatcher final Thread readerThread = new Thread(dispatcher); @@ -231,6 +226,21 @@ public abstract class AbstractListenEventProcessor extends Abst } } + /** + * Creates a pool of ByteBuffers with the given size. + * + * @param poolSize the number of buffers to initialize the pool with + * @param bufferSize the size of each buffer + * @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize + */ + protected BlockingQueue createBufferPool(final int poolSize, final int bufferSize) { + final LinkedBlockingQueue bufferPool = new LinkedBlockingQueue<>(poolSize); + for (int i = 0; i < poolSize; i++) { + bufferPool.offer(ByteBuffer.allocate(bufferSize)); + } + return bufferPool; + } + /** * If pollErrorQueue is true, the error queue will be checked first and event will be * returned from the error queue if available. @@ -271,104 +281,4 @@ public abstract class AbstractListenEventProcessor extends Abst return event; } - /** - * Batches together up to the batchSize events. Events are grouped together based on a batch key which - * by default is the sender of the event, but can be override by sub-classes. - * - * This method will return when batchSize has been reached, or when no more events are available on the queue. - * - * @param session the current session - * @param totalBatchSize the total number of events to process - * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile - * - * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all - * the batches will be <= batchSize - */ - protected Map getBatches(final ProcessSession session, final int totalBatchSize, - final byte[] messageDemarcatorBytes) { - - final Map batches = new HashMap<>(); - for (int i=0; i < totalBatchSize; i++) { - final E event = getMessage(true, true, session); - if (event == null) { - break; - } - - final String batchKey = getBatchKey(event); - FlowFileEventBatch batch = batches.get(batchKey); - - // if we don't have a batch for this key then create a new one - if (batch == null) { - batch = new FlowFileEventBatch(session.create(), new ArrayList()); - batches.put(batchKey, batch); - } - - // add the current event to the batch - batch.getEvents().add(event); - - // append the event's data to the FlowFile, write the demarcator first if not on the first event - final boolean writeDemarcator = (i > 0); - try { - final byte[] rawMessage = event.getData(); - FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - if (writeDemarcator) { - out.write(messageDemarcatorBytes); - } - - out.write(rawMessage); - } - }); - - // update the FlowFile reference in the batch object - batch.setFlowFile(appendedFlowFile); - - } catch (final Exception e) { - getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", - new Object[] {e.getMessage()}, e); - errorEvents.offer(event); - break; - } - } - - return batches; - } - - /** - * @param event an event that was pulled off the queue - * - * @return a key to use for batching events together, by default this uses the sender of the - * event, but sub-classes should override this to batch by something else - */ - protected String getBatchKey(final E event) { - return event.getSender(); - } - - /** - * Wrapper to hold a FlowFile and the events that have been appended to it. - */ - protected final class FlowFileEventBatch { - - private FlowFile flowFile; - private List events; - - public FlowFileEventBatch(final FlowFile flowFile, final List events) { - this.flowFile = flowFile; - this.events = events; - } - - public FlowFile getFlowFile() { - return flowFile; - } - - public List getEvents() { - return events; - } - - public void setFlowFile(FlowFile flowFile) { - this.flowFile = flowFile; - } - } - } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java new file mode 100644 index 0000000000..5e4c63948e --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.AttributeExpression; + +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; + +/** + * Shared properties. + */ +public class ListenerProperties { + + private static final Set interfaceSet = new HashSet<>(); + + static { + try { + final Enumeration interfaceEnum = NetworkInterface.getNetworkInterfaces(); + while (interfaceEnum.hasMoreElements()) { + final NetworkInterface ifc = interfaceEnum.nextElement(); + interfaceSet.add(ifc.getName()); + } + } catch (SocketException e) { + } + } + + public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() + .name("Local Network Interface") + .description("The name of a local network interface to be used to restrict listening to a specific LAN.") + .addValidator(new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject("Local Network Interface").valid(true).input(input).build(); + if (interfaceSet.contains(input.toLowerCase())) { + return result; + } + + String message; + String realValue = input; + try { + if (context.isExpressionLanguagePresent(input)) { + AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input); + realValue = ae.evaluate(); + } + + if (interfaceSet.contains(realValue.toLowerCase())) { + return result; + } + + message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); + + } catch (IllegalArgumentException e) { + message = "Not a valid AttributeExpression: " + e.getMessage(); + } + result = new ValidationResult.Builder().subject("Local Network Interface") + .valid(false).input(input).explanation(message).build(); + + return result; + } + }) + .expressionLanguageSupported(true) + .build(); + +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java index cff230efda..444aeb13be 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java @@ -17,6 +17,7 @@ package org.apache.nifi.processor.util.listen.dispatcher; import java.io.IOException; +import java.net.InetAddress; /** * Dispatches handlers for a given channel. @@ -27,11 +28,16 @@ public interface ChannelDispatcher extends Runnable { * Opens the dispatcher listening on the given port and attempts to set the * OS socket buffer to maxBufferSize. * + * @param nicAddress the local network interface to listen on, if null will listen on the wildcard address + * which means listening on all local network interfaces + * * @param port the port to listen on + * * @param maxBufferSize the size to set the OS socket buffer to + * * @throws IOException if an error occurred listening on the given port */ - void open(int port, int maxBufferSize) throws IOException; + void open(InetAddress nicAddress, int port, int maxBufferSize) throws IOException; /** * @return the port being listened to diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java index e362e15923..3fb9ba959f 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -21,8 +21,10 @@ import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.EventQueue; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.StandardSocketOptions; @@ -42,8 +44,10 @@ public class DatagramChannelDispatcher> impleme private final EventFactory eventFactory; private final BlockingQueue bufferPool; - private final BlockingQueue events; + private final EventQueue events; private final ProcessorLog logger; + private final String sendingHost; + private final Integer sendingPort; private Selector selector; private DatagramChannel datagramChannel; @@ -53,10 +57,21 @@ public class DatagramChannelDispatcher> impleme final BlockingQueue bufferPool, final BlockingQueue events, final ProcessorLog logger) { + this(eventFactory, bufferPool, events, logger, null, null); + } + + public DatagramChannelDispatcher(final EventFactory eventFactory, + final BlockingQueue bufferPool, + final BlockingQueue events, + final ProcessorLog logger, + final String sendingHost, + final Integer sendingPort) { this.eventFactory = eventFactory; this.bufferPool = bufferPool; - this.events = events; this.logger = logger; + this.sendingHost = sendingHost; + this.sendingPort = sendingPort; + this.events = new EventQueue<>(events, logger); if (bufferPool == null || bufferPool.size() == 0) { throw new IllegalArgumentException("A pool of available ByteBuffers is required"); @@ -64,7 +79,7 @@ public class DatagramChannelDispatcher> impleme } @Override - public void open(final int port, int maxBufferSize) throws IOException { + public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { stopped = false; datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); @@ -78,7 +93,17 @@ public class DatagramChannelDispatcher> impleme + "maximum receive buffer"); } } - datagramChannel.socket().bind(new InetSocketAddress(port)); + + // we don't have to worry about nicAddress being null here because InetSocketAddress already handles it + datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port)); + + // if a sending host and port were provided then connect to that specific address to only receive + // datagrams from that host/port, otherwise we can receive datagrams from any host/port + if (sendingHost != null && sendingPort != null) { + datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort)); + } + selector = Selector.open(); datagramChannel.register(selector, SelectionKey.OP_READ); } @@ -115,9 +140,8 @@ public class DatagramChannelDispatcher> impleme final Map metadata = EventFactoryUtil.createMapWithSender(sender); final E event = eventFactory.create(bytes, metadata, null); + events.offer(event); - // queue the raw message with the sender, block until space is available - events.put(event); buffer.clear(); } } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java index 670dba4d33..70b2bae0f3 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -27,6 +27,7 @@ import org.apache.nifi.security.util.SslContextFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; @@ -104,7 +105,7 @@ public class SocketChannelDispatcher> implements } @Override - public void open(final int port, int maxBufferSize) throws IOException { + public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { stopped = false; executor = Executors.newFixedThreadPool(maxConnections); @@ -119,7 +120,9 @@ public class SocketChannelDispatcher> implements + "maximum receive buffer"); } } - serverSocketChannel.socket().bind(new InetSocketAddress(port)); + + serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port)); + selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java new file mode 100644 index 0000000000..4afaf40e8a --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.logging.ProcessorLog; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Wraps a BlockingQueue to centralize logic for offering events across UDP, TCP, and SSL. + * + * @param the type of event + */ +public class EventQueue { + + /** + * The default number of milliseconds to wait when offering new events to the queue. + */ + public static final long DEFAULT_OFFER_WAIT_MS = 100; + + private final long offerWaitMs; + private final BlockingQueue events; + private final ProcessorLog logger; + + public EventQueue(final BlockingQueue events, final ProcessorLog logger) { + this(events, DEFAULT_OFFER_WAIT_MS, logger); + } + + public EventQueue(final BlockingQueue events, final long offerWaitMs, final ProcessorLog logger) { + this.events = events; + this.offerWaitMs = offerWaitMs; + this.logger = logger; + Validate.notNull(this.events); + Validate.notNull(this.logger); + } + + /** + * Offers the given event to the events queue with a wait time, if the offer fails the event + * is dropped an error is logged. + * + * @param event the event to offer + * @throws InterruptedException if interrupted while waiting to offer + */ + public void offer(final E event) throws InterruptedException { + boolean queued = events.offer(event, offerWaitMs, TimeUnit.MILLISECONDS); + if (!queued) { + logger.error("Internal queue at maximum capacity, could not queue event"); + } + } + +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java new file mode 100644 index 0000000000..9ae6161515 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.util.Map; + +/** + * EventFactory to create StandardEvent instances. + */ +public class StandardEventFactory implements EventFactory { + + @Override + public StandardEvent create(final byte[] data, final Map metadata, final ChannelResponder responder) { + String sender = null; + if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) { + sender = metadata.get(EventFactory.SENDER_KEY); + } + return new StandardEvent(sender, data, responder); + } + +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java index ab4346f983..6307c7635a 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java @@ -20,6 +20,7 @@ import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventQueue; import java.nio.channels.SelectionKey; import java.nio.charset.Charset; @@ -34,7 +35,7 @@ public abstract class ChannelHandler eventFactory; - protected final BlockingQueue events; + protected final EventQueue events; protected final ProcessorLog logger; @@ -48,8 +49,8 @@ public abstract class ChannelHandler(events, logger); } } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java index 460ef08b2f..d11cb02a90 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java @@ -135,10 +135,8 @@ public class SSLSocketChannelHandler> extends Soc if (currBytes.size() > 0) { final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - - // queue the raw event blocking until space is available, reset the temporary buffer final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.put(event); + events.offer(event); currBytes.reset(); } } else { diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java index e2fd3a84de..94e41eba05 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java @@ -137,10 +137,8 @@ public class StandardSocketChannelHandler> extend if (currBytes.size() > 0) { final SocketChannelResponder response = new SocketChannelResponder(socketChannel); final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - - // queue the raw event blocking until space is available, reset the buffer final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.put(event); + events.offer(event); currBytes.reset(); // Mark this as the start of the next message diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java index c386bdca78..51173ecde0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java @@ -24,14 +24,12 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor; +import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; @@ -56,7 +54,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"listen", "relp", "tcp", "logs"}) @@ -72,7 +69,7 @@ import java.util.concurrent.LinkedBlockingQueue; @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain") }) @SeeAlso({ParseSyslog.class}) -public class ListenRELP extends AbstractListenEventProcessor { +public class ListenRELP extends AbstractListenEventBatchingProcessor { public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") @@ -83,11 +80,10 @@ public class ListenRELP extends AbstractListenEventProcessor { .build(); private volatile RELPEncoder relpEncoder; - private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents. @Override protected List getAdditionalProperties() { - return Arrays.asList(MAX_CONNECTIONS, MAX_BATCH_SIZE, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE); + return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE); } @Override @@ -96,9 +92,6 @@ public class ListenRELP extends AbstractListenEventProcessor { super.onScheduled(context); // wanted to ensure charset was already populated here relpEncoder = new RELPEncoder(charset); - - final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - messageDemarcatorBytes = msgDemarcator.getBytes(charset); } @Override @@ -111,10 +104,7 @@ public class ListenRELP extends AbstractListenEventProcessor { final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); // initialize the buffer pool based on max number of connections and the buffer size - final LinkedBlockingQueue bufferPool = new LinkedBlockingQueue<>(maxConnections); - for (int i = 0; i < maxConnections; i++) { - bufferPool.offer(ByteBuffer.allocate(bufferSize)); - } + final BlockingQueue bufferPool = createBufferPool(maxConnections, bufferSize); // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -129,69 +119,20 @@ public class ListenRELP extends AbstractListenEventProcessor { } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); - final Map batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); - - // if the size is 0 then there was nothing to process so return - // we don't need to yield here because inside getBatches() we are polling a queue with a wait - // and yielding here could have a negative impact on performance - if (batches.size() == 0) { - return; - } - - for (Map.Entry entry : batches.entrySet()) { - FlowFile flowFile = entry.getValue().getFlowFile(); - final List events = entry.getValue().getEvents(); - - if (flowFile.getSize() == 0L || events.size() == 0) { - session.remove(flowFile); - getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); - continue; - } - - // the sender and command will be the same for all events based on the batch key - final String sender = events.get(0).getSender(); - final String command = events.get(0).getCommand(); - - final int numAttributes = events.size() == 1 ? 5 : 4; - - final Map attributes = new HashMap<>(numAttributes); - attributes.put(RELPAttributes.COMMAND.key(), command); - attributes.put(RELPAttributes.SENDER.key(), sender); - attributes.put(RELPAttributes.PORT.key(), String.valueOf(port)); - attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - - // if there was only one event then we can pass on the transaction - // NOTE: we could pass on all the transaction ids joined together - if (events.size() == 1) { - attributes.put(RELPAttributes.TXNR.key(), String.valueOf(events.get(0).getTxnr())); - } - flowFile = session.putAllAttributes(flowFile, attributes); - - getLogger().debug("Transferring {} to success", new Object[] {flowFile}); - session.transfer(flowFile, REL_SUCCESS); - session.adjustCounter("FlowFiles Transferred to Success", 1L, false); - - // create a provenance receive event - final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; - final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":") - .append(port).toString(); - session.getProvenanceReporter().receive(flowFile, transitUri); - - // commit the session to guarantee the data has been delivered - session.commit(); - - // respond to each event to acknowledge successful receipt - for (final RELPEvent event : events) { - respond(event, RELPResponse.ok(event.getTxnr())); - } - } + protected String getBatchKey(RELPEvent event) { + return event.getSender() + "_" + event.getCommand(); } @Override - protected String getBatchKey(RELPEvent event) { - return event.getSender() + "_" + event.getCommand(); + protected void postProcess(final ProcessContext context, final ProcessSession session, final List events) { + // first commit the session so we guarantee we have all the events successfully + // written to FlowFiles and transferred to the success relationship + session.commit(); + + // respond to each event to acknowledge successful receipt + for (final RELPEvent event : events) { + respond(event, RELPResponse.ok(event.getTxnr())); + } } protected void respond(final RELPEvent event, final RELPResponse relpResponse) { @@ -207,6 +148,39 @@ public class ListenRELP extends AbstractListenEventProcessor { } } + @Override + protected Map getAttributes(FlowFileEventBatch batch) { + final List events = batch.getEvents(); + + // the sender and command will be the same for all events based on the batch key + final String sender = events.get(0).getSender(); + final String command = events.get(0).getCommand(); + + final int numAttributes = events.size() == 1 ? 5 : 4; + + final Map attributes = new HashMap<>(numAttributes); + attributes.put(RELPAttributes.COMMAND.key(), command); + attributes.put(RELPAttributes.SENDER.key(), sender); + attributes.put(RELPAttributes.PORT.key(), String.valueOf(port)); + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + // if there was only one event then we can pass on the transaction + // NOTE: we could pass on all the transaction ids joined together + if (events.size() == 1) { + attributes.put(RELPAttributes.TXNR.key(), String.valueOf(events.get(0).getTxnr())); + } + return attributes; + } + + @Override + protected String getTransitUri(FlowFileEventBatch batch) { + final String sender = batch.getEvents().get(0).getSender(); + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":") + .append(port).toString(); + return transitUri; + } + public enum RELPAttributes implements FlowFileAttributeKey { TXNR("relp.txnr"), COMMAND("relp.command"), diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 1ec406dbf6..38668e518c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.processors.standard; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -55,6 +58,8 @@ import org.apache.nifi.ssl.SSLContextService; import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.OutputStream; +import java.net.InetAddress; +import java.net.NetworkInterface; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; @@ -189,6 +194,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { final List descriptors = new ArrayList<>(); descriptors.add(PROTOCOL); descriptors.add(PORT); + descriptors.add(NETWORK_INTF_NAME); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_MESSAGE_QUEUE_SIZE); @@ -257,6 +263,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger(); final String protocol = context.getProperty(PROTOCOL).getValue(); + final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); final String charSet = context.getProperty(CHARSET).getValue(); final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charSet)); @@ -276,10 +283,16 @@ public class ListenSyslog extends AbstractSyslogProcessor { parser = new SyslogParser(Charset.forName(charSet)); syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize); + InetAddress nicIPAddress = null; + if (!StringUtils.isEmpty(nicIPAddressStr)) { + NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); + nicIPAddress = netIF.getInetAddresses().nextElement(); + } + // create either a UDP or TCP reader and call open() to bind to the given port final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); channelDispatcher = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); - channelDispatcher.open(port, maxChannelBufferSize); + channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize); final Thread readerThread = new Thread(channelDispatcher); readerThread.setName("ListenSyslog [" + getIdentifier() + "]"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java index c79d5931cc..86554c812f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -23,31 +23,26 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor; +import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.processor.util.listen.event.StandardEventFactory; import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.ssl.SSLContextService; import javax.net.ssl.SSLContext; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; @@ -57,7 +52,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @@ -71,7 +65,7 @@ import java.util.concurrent.LinkedBlockingQueue; @WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.") }) -public class ListenTCP extends AbstractListenEventProcessor { +public class ListenTCP extends AbstractListenEventBatchingProcessor { public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") @@ -89,15 +83,10 @@ public class ListenTCP extends AbstractListenEventProcessor .defaultValue(SSLContextService.ClientAuth.REQUIRED.name()) .build(); - // it is only the array reference that is volatile - not the contents. - private volatile byte[] messageDemarcatorBytes; - @Override protected List getAdditionalProperties() { return Arrays.asList( MAX_CONNECTIONS, - MAX_BATCH_SIZE, - MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE, CLIENT_AUTH ); @@ -120,15 +109,7 @@ public class ListenTCP extends AbstractListenEventProcessor } @Override - @OnScheduled - public void onScheduled(ProcessContext context) throws IOException { - super.onScheduled(context); - final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - messageDemarcatorBytes = msgDemarcator.getBytes(charset); - } - - @Override - protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue events) + protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue events) throws IOException { final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); @@ -136,12 +117,7 @@ public class ListenTCP extends AbstractListenEventProcessor final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); // initialize the buffer pool based on max number of connections and the buffer size - final LinkedBlockingQueue bufferPool = new LinkedBlockingQueue<>(maxConnections); - for (int i = 0; i < maxConnections; i++) { - bufferPool.offer(ByteBuffer.allocate(bufferSize)); - } - - final EventFactory eventFactory = new TCPEventFactory(); + final BlockingQueue bufferPool = createBufferPool(maxConnections, bufferSize); // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -154,73 +130,27 @@ public class ListenTCP extends AbstractListenEventProcessor clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue); } - final ChannelHandlerFactory, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); + final EventFactory eventFactory = new StandardEventFactory(); + final ChannelHandlerFactory, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet); } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); - final Map batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); - - // if the size is 0 then there was nothing to process so return - // we don't need to yield here because we have a long poll in side of getBatches - if (batches.size() == 0) { - return; - } - - for (Map.Entry entry : batches.entrySet()) { - FlowFile flowFile = entry.getValue().getFlowFile(); - final List events = entry.getValue().getEvents(); - - if (flowFile.getSize() == 0L || events.size() == 0) { - session.remove(flowFile); - getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); - continue; - } - - // the sender and command will be the same for all events based on the batch key - final String sender = events.get(0).getSender(); - - final Map attributes = new HashMap<>(3); - attributes.put("tcp.sender", sender); - attributes.put("tcp.port", String.valueOf(port)); - flowFile = session.putAllAttributes(flowFile, attributes); - - getLogger().debug("Transferring {} to success", new Object[] {flowFile}); - session.transfer(flowFile, REL_SUCCESS); - session.adjustCounter("FlowFiles Transferred to Success", 1L, false); - - // create a provenance receive event - final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; - final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":") - .append(port).toString(); - session.getProvenanceReporter().receive(flowFile, transitUri); - } + protected Map getAttributes(final FlowFileEventBatch batch) { + final String sender = batch.getEvents().get(0).getSender(); + final Map attributes = new HashMap<>(3); + attributes.put("tcp.sender", sender); + attributes.put("tcp.port", String.valueOf(port)); + return attributes; } - /** - * Event implementation for TCP. - */ - static class TCPEvent extends StandardEvent { - - public TCPEvent(String sender, byte[] data, ChannelResponder responder) { - super(sender, data, responder); - } + @Override + protected String getTransitUri(FlowFileEventBatch batch) { + final String sender = batch.getEvents().get(0).getSender(); + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":") + .append(port).toString(); + return transitUri; } - /** - * Factory implementation for TCPEvents. - */ - static final class TCPEventFactory implements EventFactory { - - @Override - public TCPEvent create(byte[] data, Map metadata, ChannelResponder responder) { - String sender = null; - if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) { - sender = metadata.get(EventFactory.SENDER_KEY); - } - return new TCPEvent(sender, data, responder); - } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java index b620dd3920..4d7d7e6700 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java @@ -16,198 +16,54 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.AttributeExpression; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.io.nio.BufferPool; -import org.apache.nifi.io.nio.ChannelListener; -import org.apache.nifi.io.nio.consumer.StreamConsumer; -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.UDPStreamConsumer; -import org.apache.nifi.util.Tuple; +import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.processor.util.listen.event.StandardEventFactory; -/** - *

- * This processor listens for Datagram Packets on a given port and concatenates the contents of those packets together generating flow files roughly as often as the internal buffer fills up or until - * no more data is currently available. - *

- * - *

- * This processor has the following required properties: - *

    - *
  • Port - The port to listen on for data packets. Must be known by senders of Datagrams.
  • - *
  • Receive Timeout - The time out period when waiting to receive data from the socket. Specify units. Default is 5 secs.
  • - *
  • Max Buffer Size - Determines the size each receive buffer may be. Specify units. Default is 1 MB.
  • - *
  • FlowFile Size Trigger - Determines the (almost) upper bound size at which a flow file would be generated. A flow file will get made even if this value isn't reached if there is no more - * data streaming in and this value may be exceeded by the size of a single packet. Specify units. Default is 1 MB.
  • - *
  • Max size of UDP Buffer - The maximum UDP buffer size that should be used. This is a suggestion to the Operating System to indicate how big the udp socket buffer should be. Specify units. - * Default is 1 MB.")
  • - *
  • Receive Buffer Count - Number of receiving buffers to be used to accept data from the socket. Higher numbers means more ram is allocated but can allow better throughput. Default is - * 4.
  • - *
  • Channel Reader Interval - Scheduling interval for each read channel. Specify units. Default is 50 millisecs.
  • - *
  • FlowFiles Per Session - The number of flow files per session. Higher number is more efficient, but will lose more data if a problem occurs that causes a rollback of a session. Default is - * 10
  • - *
- *

- * - * This processor has the following optional properties: - *
    - *
  • Sending Host - IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an - * environment variable.
  • - *
  • Sending Host Port - Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system - * property or an environment variable.
  • - *
- * - *

- * The following relationships are required: - *

    - *
  • success - Where to route newly created flow files.
  • - *
- *

- * - */ -@TriggerWhenEmpty +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +@SupportsBatching @Tags({"ingest", "udp", "listen", "source"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Listens for Datagram Packets on a given port and concatenates the contents of those packets " - + "together generating flow files") -public class ListenUDP extends AbstractSessionFactoryProcessor { +@CapabilityDescription("Listens for Datagram Packets on a given port. The default behavior produces a FlowFile " + + "per datagram, however for higher throughput the Max Batch Size property may be increased to specify the number of " + + "datagrams to batch together in a single FlowFile. This processor can be restricted to listening for datagrams from a " + + "specific remote host and port by specifying the Sending Host and Sending Host Port properties, otherwise it will listen " + + "for datagrams from all hosts and ports.") +@WritesAttributes({ + @WritesAttribute(attribute="udp.sender", description="The sending host of the messages."), + @WritesAttribute(attribute="udp.port", description="The sending port the messages were received.") +}) +public class ListenUDP extends AbstractListenEventBatchingProcessor { - private static final Set relationships; - private static final List properties; - - // relationships. - public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() - .name("success") - .description("Connection which contains concatenated Datagram Packets") - .build(); - - static { - final Set rels = new HashSet<>(); - rels.add(RELATIONSHIP_SUCCESS); - relationships = Collections.unmodifiableSet(rels); - } - // required properties. - public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Port") - .description("Port to listen on. Must be known by senders of Datagrams.") - .addValidator(StandardValidators.PORT_VALIDATOR) - .required(true) - .build(); - - public static final PropertyDescriptor RECV_TIMEOUT = new PropertyDescriptor.Builder() - .name("Receive Timeout") - .description("The time out period when waiting to receive data from the socket. Specify units.") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("5 secs") - .required(true) - .build(); - - public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM = new PropertyDescriptor.Builder() - .name("FlowFile Per Datagram") - .description("Determines if this processor emits each datagram as a FlowFile, or if multiple datagrams can be placed in a single FlowFile.") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - - public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("Determines the size each receive buffer may be") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .required(true) - .build(); - - public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder() - .name("FlowFile Size Trigger") - .description("Determines the (almost) upper bound size at which a flow file would be generated.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .required(true) - .build(); - - public static final PropertyDescriptor MAX_UDP_BUFFER = new PropertyDescriptor.Builder() - .name("Max size of UDP Buffer") - .description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System " - + "to indicate how big the udp socket buffer should be.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .required(true) - .build(); - - public static final PropertyDescriptor RECV_BUFFER_COUNT = new PropertyDescriptor.Builder() - .name("Receive Buffer Count") - .description("Number of receiving buffers to be used to accept data from the socket. Higher numbers " - + "means more ram is allocated but can allow better throughput.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("4") - .required(true) - .build(); - - public static final PropertyDescriptor CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder() - .name("Channel Reader Interval") - .description("Scheduling interval for each read channel.") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("50 ms") - .required(true) - .build(); - - public static final PropertyDescriptor FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder() - .name("FlowFiles Per Session") - .description("The number of flow files per session.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10") - .build(); - - // optional properties. public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder() .name("Sending Host") .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will " @@ -224,359 +80,70 @@ public class ListenUDP extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(true) .build(); - private static final Set interfaceSet = new HashSet<>(); - - static { - try { - final Enumeration interfaceEnum = NetworkInterface.getNetworkInterfaces(); - while (interfaceEnum.hasMoreElements()) { - final NetworkInterface ifc = interfaceEnum.nextElement(); - interfaceSet.add(ifc.getName()); - } - } catch (SocketException e) { - } - } - public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() - .name("Local Network Interface") - .description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN." - + "May be a system property or an environment variable.") - .addValidator(new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - ValidationResult result = new ValidationResult.Builder() - .subject("Local Network Interface").valid(true).input(input).build(); - if (interfaceSet.contains(input.toLowerCase())) { - return result; - } - - String message; - try { - AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input); - String realValue = ae.evaluate(); - if (interfaceSet.contains(realValue.toLowerCase())) { - return result; - } - - message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); - - } catch (IllegalArgumentException e) { - message = "Not a valid AttributeExpression: " + e.getMessage(); - } - result = new ValidationResult.Builder().subject("Local Network Interface") - .valid(false).input(input).explanation(message).build(); - - return result; - } - }) - .expressionLanguageSupported(true).build(); - - static { - List props = new ArrayList<>(); - props.add(SENDING_HOST); - props.add(SENDING_HOST_PORT); - props.add(NETWORK_INTF_NAME); - props.add(CHANNEL_READER_PERIOD); - props.add(FLOW_FILE_SIZE_TRIGGER); - props.add(MAX_BUFFER_SIZE); - props.add(MAX_UDP_BUFFER); - props.add(PORT); - props.add(RECV_BUFFER_COUNT); - props.add(FLOW_FILES_PER_SESSION); - props.add(RECV_TIMEOUT); - props.add(FLOW_FILE_PER_DATAGRAM); - properties = Collections.unmodifiableList(props); - } - // defaults - public static final int DEFAULT_LISTENING_THREADS = 2; - // lock used to protect channelListener - private final Lock lock = new ReentrantLock(); - private volatile ChannelListener channelListener = null; - private final BlockingQueue>> flowFilesPerSessionQueue = new LinkedBlockingQueue<>(); - private final List newFlowFiles = new ArrayList<>(); - private final AtomicReference consumerRef = new AtomicReference<>(); - private final AtomicBoolean stopping = new AtomicBoolean(false); - private final AtomicReference sessionFactoryRef = new AtomicReference<>(); - private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor(); - private final AtomicReference>>> consumerFutureRef = new AtomicReference<>(); - private final AtomicBoolean resetChannelListener = new AtomicBoolean(false); - // instance attribute for provenance receive event generation - private volatile String sendingHost; + public static final String UDP_PORT_ATTR = "udp.port"; + public static final String UDP_SENDER_ATTR = "udp.sender"; @Override - public Set getRelationships() { - return relationships; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } - - /** - * Create the ChannelListener and a thread that causes the Consumer to create flow files. - * - * @param context context - * @throws IOException ex - */ - @OnScheduled - public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException { - getChannelListener(context); - stopping.set(false); - Future>> consumerFuture = consumerExecutorService.submit(new Callable>>() { - - @Override - public Tuple> call() { - final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger(); - final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); - // number of waits in 5 secs, or 1 - final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1); - final ProcessorLog logger = getLogger(); - int flowFileCount = maxFlowFilesPerSession; - ProcessSession session = null; - int numWaits = 0; - while (!stopping.get()) { - UDPStreamConsumer consumer = consumerRef.get(); - if (consumer == null || sessionFactoryRef.get() == null) { - try { - Thread.sleep(100L); - } catch (InterruptedException swallow) { - } - } else { - try { - // first time through, flowFileCount is maxFlowFilesPerSession so that a session - // is created and the consumer is updated with it. - if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) { - logger.debug("Have waited {} times", new Object[]{numWaits}); - numWaits = 0; - if (session != null) { - Tuple> flowFilesPerSession = new Tuple>(session, new ArrayList<>(newFlowFiles)); - newFlowFiles.clear(); - flowFilesPerSessionQueue.add(flowFilesPerSession); - } - session = sessionFactoryRef.get().createSession(); - consumer.setSession(session); - flowFileCount = 0; - } - // this will throttle the processing of the received datagrams. If there are no more - // buffers to read into because none have been returned to the pool via consumer.process(), - // then the desired back pressure on the channel is created. - if (context.getAvailableRelationships().size() > 0) { - consumer.process(); - if (flowFileCount == newFlowFiles.size()) { - // no new datagrams received, need to throttle this thread back so it does - // not consume all cpu...but don't want to cause back pressure on the channel - // so the sleep time is same as the reader interval - // If have done this for approx. 5 secs, assume datagram sender is down. So, push - // out the remaining flow files (see numWaits == maxWaits above) - Thread.sleep(channelReaderIntervalMSecs); - if (flowFileCount > 0) { - numWaits++; - } - } else { - flowFileCount = newFlowFiles.size(); - } - } else { - logger.debug("Creating back pressure...no available destinations"); - Thread.sleep(1000L); - } - } catch (final IOException ioe) { - logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe); - } catch (InterruptedException e) { - // don't care - } finally { - if (consumer.isConsumerFinished()) { - logger.info("Consumer {} was closed and is finished", new Object[]{consumer}); - consumerRef.set(null); - disconnect(); - if (!stopping.get()) { - resetChannelListener.set(true); - } - } - } - } - } - // when shutting down, need consumer to drain rest of cached buffers and clean up. - // prior to getting here, the channelListener was shutdown - UDPStreamConsumer consumer; - while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) { - try { - consumer.process(); - } catch (IOException swallow) { - // if this is blown...consumer.isConsumerFinished will be true - } - } - Tuple> flowFilesPerSession = new Tuple>(session, new ArrayList<>(newFlowFiles)); - return flowFilesPerSession; - } - }); - consumerFutureRef.set(consumerFuture); - } - - private void disconnect() { - if (lock.tryLock()) { - try { - if (channelListener != null) { - getLogger().debug("Shutting down channel listener {}", new Object[]{channelListener}); - channelListener.shutdown(500L, TimeUnit.MILLISECONDS); - channelListener = null; - } - } finally { - lock.unlock(); - } - } - } - - private void getChannelListener(final ProcessContext context) throws IOException { - if (lock.tryLock()) { - try { - ProcessorLog logger = getLogger(); - logger.debug("Instantiating a new channel listener"); - final int port = context.getProperty(PORT).asInteger(); - final int bufferCount = context.getProperty(RECV_BUFFER_COUNT).asInteger(); - final Double bufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B); - final Double rcvBufferSize = context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B); - sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue(); - final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger(); - final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); - final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B); - final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final boolean flowFilePerDatagram = context.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean(); - final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() { - - @Override - public StreamConsumer newInstance(final String streamId) { - final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger(), flowFilePerDatagram); - consumerRef.set(consumer); - return consumer; - } - }; - final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE); - channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS, flowFilePerDatagram); - // specifying a sufficiently low number for each stream to be fast enough though very efficient - channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS); - InetAddress nicIPAddress = null; - if (null != nicIPAddressStr) { - NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); - nicIPAddress = netIF.getInetAddresses().nextElement(); - } - channelListener.addDatagramChannel(nicIPAddress, port, rcvBufferSize.intValue(), sendingHost, sendingHostPort); - logger.info("Registered service and initialized UDP socket listener. Now listening on port " + port + "..."); - } finally { - lock.unlock(); - } - } + protected List getAdditionalProperties() { + return Arrays.asList( + SENDING_HOST, + SENDING_HOST_PORT + ); } @Override protected Collection customValidate(ValidationContext validationContext) { - Collection result = new ArrayList<>(); - String sendingHost = validationContext.getProperty(SENDING_HOST).getValue(); - String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue(); + final Collection result = new ArrayList<>(); + + final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue(); + final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue(); + if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) { - result.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false) - .explanation("Must specify Sending Host when specifying Sending Host Port").build()); + result.add( + new ValidationResult.Builder() + .subject(SENDING_HOST.getName()) + .valid(false) + .explanation("Must specify Sending Host when specifying Sending Host Port") + .build()); } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) { result.add( - new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build()); + new ValidationResult.Builder() + .subject(SENDING_HOST_PORT.getName()) + .valid(false) + .explanation("Must specify Sending Host Port when specifying Sending Host") + .build()); } + return result; } @Override - public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { - final ProcessorLog logger = getLogger(); - sessionFactoryRef.compareAndSet(null, sessionFactory); - if (resetChannelListener.getAndSet(false) && !stopping.get()) { - try { - getChannelListener(context); - } catch (IOException e) { - logger.error("Tried to reset Channel Listener and failed due to:", e); - resetChannelListener.set(true); - } - } - - transferFlowFiles(); + protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue events) + throws IOException { + final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue(); + final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger(); + final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final BlockingQueue bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize); + final EventFactory eventFactory = new StandardEventFactory(); + return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort); } - private boolean transferFlowFiles() { - final ProcessorLog logger = getLogger(); - ProcessSession session; - Tuple> flowFilesPerSession = null; - boolean transferred = false; - try { - flowFilesPerSession = flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - if (flowFilesPerSession != null) { - session = flowFilesPerSession.getKey(); - List flowFiles = flowFilesPerSession.getValue(); - String sourceSystem = sendingHost == null ? "Unknown" : sendingHost; - try { - for (FlowFile flowFile : flowFiles) { - session.getProvenanceReporter().receive(flowFile, sourceSystem); - session.transfer(flowFile, RELATIONSHIP_SUCCESS); - } - logger.info("Transferred flow files {} to success", new Object[]{flowFiles}); - transferred = true; - - // need to check for erroneous flow files in input queue - List existingFlowFiles = session.get(10); - for (FlowFile existingFlowFile : existingFlowFiles) { - if (existingFlowFile != null && existingFlowFile.getSize() > 0) { - session.transfer(existingFlowFile, RELATIONSHIP_SUCCESS); - logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success", - new Object[]{existingFlowFile}); - } else if (existingFlowFile != null) { - session.remove(existingFlowFile); - logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", - new Object[]{existingFlowFile}); - } - } - session.commit(); - } catch (Throwable t) { - session.rollback(); - logger.error("Failed to transfer flow files or commit session...rolled back", t); - throw t; - } - } - return transferred; + @Override + protected Map getAttributes(final FlowFileEventBatch batch) { + final String sender = batch.getEvents().get(0).getSender(); + final Map attributes = new HashMap<>(3); + attributes.put(UDP_SENDER_ATTR, sender); + attributes.put(UDP_PORT_ATTR, String.valueOf(port)); + return attributes; } - @OnUnscheduled - public void stopping() { - getLogger().debug("Stopping Processor"); - disconnect(); - stopping.set(true); - Future>> future; - Tuple> flowFilesPerSession; - if ((future = consumerFutureRef.getAndSet(null)) != null) { - try { - flowFilesPerSession = future.get(); - if (flowFilesPerSession.getValue().size() > 0) { - getLogger().debug("Draining remaining flow Files when stopping"); - flowFilesPerSessionQueue.add(flowFilesPerSession); - } else { - // need to close out the session that has no flow files - flowFilesPerSession.getKey().commit(); - } - } catch (InterruptedException | ExecutionException e) { - getLogger().error("Failure in cleaning up!", e); - } - boolean moreFiles = true; - while (moreFiles) { - try { - moreFiles = transferFlowFiles(); - } catch (Throwable t) { - getLogger().error("Problem transferring cached flowfiles", t); - } - } - } - } - - @OnStopped - public void stopped() { - sessionFactoryRef.set(null); + @Override + protected String getTransitUri(FlowFileEventBatch batch) { + final String sender = batch.getEvents().get(0).getSender(); + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":") + .append(port).toString(); + return transitUri; } public static class HostValidator implements Validator { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java index 5af316e551..7b879f3afa 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.processors.standard.relp.handler; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.EventQueue; import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processor.util.listen.response.ChannelResponse; import org.apache.nifi.processors.standard.relp.event.RELPMetadata; @@ -45,21 +47,24 @@ public class RELPFrameHandler> { private final Charset charset; private final EventFactory eventFactory; - private final BlockingQueue events; + private final EventQueue events; private final SelectionKey key; private final AsyncChannelDispatcher dispatcher; + private final ProcessorLog logger; private final RELPEncoder encoder; public RELPFrameHandler(final SelectionKey selectionKey, final Charset charset, final EventFactory eventFactory, final BlockingQueue events, - final AsyncChannelDispatcher dispatcher) { + final AsyncChannelDispatcher dispatcher, + final ProcessorLog logger) { this.key = selectionKey; this.charset = charset; this.eventFactory = eventFactory; - this.events = events; this.dispatcher = dispatcher; + this.logger = logger; + this.events = new EventQueue<>(events, logger); this.encoder = new RELPEncoder(charset); } @@ -82,9 +87,8 @@ public class RELPFrameHandler> { metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr())); metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand()); - // queue the raw event blocking until space is available, reset the buffer final E event = eventFactory.create(frame.getData(), metadata, responder); - events.put(event); + events.offer(event); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java index 8bbd11638f..30697ed8d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java @@ -50,7 +50,7 @@ public class RELPSSLSocketChannelHandler> extends final ProcessorLog logger) { super(key, dispatcher, charset, eventFactory, events, logger); this.decoder = new RELPDecoder(charset); - this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java index 0bb81850fc..e3e84cfd19 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java @@ -50,7 +50,7 @@ public class RELPSocketChannelHandler> extends St final ProcessorLog logger) { super(key, dispatcher, charset, eventFactory, events, logger); this.decoder = new RELPDecoder(charset); - this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java index b885e49078..5ff47dcc9c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java @@ -18,8 +18,11 @@ package org.apache.nifi.processors.standard; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processors.standard.relp.event.RELPEvent; import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; import org.apache.nifi.processors.standard.relp.frame.RELPFrame; @@ -35,13 +38,16 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.Socket; +import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; public class TestListenRELP { @@ -78,7 +84,7 @@ public class TestListenRELP { encoder = new RELPEncoder(StandardCharsets.UTF_8); proc = new ResponseCapturingListenRELP(); runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PORT, "0"); + runner.setProperty(ListenRELP.PORT, "0"); } @Test @@ -169,6 +175,38 @@ public class TestListenRELP { run(frames, 5, 5, sslContextService); } + @Test + public void testNoEventsAvailable() throws IOException, InterruptedException { + MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList()); + runner = TestRunners.newTestRunner(mockListenRELP); + runner.setProperty(ListenRELP.PORT, "1"); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0); + } + + @Test + public void testBatchingWithDifferentSenders() throws IOException, InterruptedException { + final String sender1 = "sender1"; + final String sender2 = "sender2"; + final ChannelResponder responder = Mockito.mock(ChannelResponder.class); + + final List mockEvents = new ArrayList<>(); + mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + + MockListenRELP mockListenRELP = new MockListenRELP(mockEvents); + runner = TestRunners.newTestRunner(mockListenRELP); + runner.setProperty(ListenRELP.PORT, "1"); + runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2); + } + + protected void run(final List frames, final int expectedTransferred, final int expectedResponses, final SSLContextService sslContextService) throws IOException, InterruptedException { @@ -250,4 +288,27 @@ public class TestListenRELP { } } + // Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events + private static class MockListenRELP extends ListenRELP { + + private List mockEvents; + + public MockListenRELP(List mockEvents) { + this.mockEvents = mockEvents; + } + + @OnScheduled + @Override + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + events.addAll(mockEvents); + } + + @Override + protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue events) throws IOException { + return Mockito.mock(ChannelDispatcher.class); + } + + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 2743cafd65..4731a10537 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -95,7 +95,7 @@ public class TestListenSyslog { while (numTransfered < numMessages && System.currentTimeMillis() < timeout) { Thread.sleep(10); proc.onTrigger(context, processSessionFactory); - numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size(); + numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); } Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java index 864d7a7767..d94d4acbf4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java @@ -16,34 +16,40 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Ignore public class TestListenUDP { + private int port = 0; + private ListenUDP proc; private TestRunner runner; - private static Logger LOGGER; - private DatagramSocket socket; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -52,7 +58,6 @@ public class TestListenUDP { System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug"); - LOGGER = LoggerFactory.getLogger(TestListenUDP.class); } @AfterClass @@ -62,152 +67,267 @@ public class TestListenUDP { @Before public void setUp() throws Exception { - runner = TestRunners.newTestRunner(ListenUDP.class); - socket = new DatagramSocket(20001); - } - - @After - public void tearDown() throws Exception { - socket.close(); + proc = new ListenUDP(); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenUDP.PORT, String.valueOf(port)); } @Test - public void testWithStopProcessor() throws IOException, InterruptedException { - LOGGER.info("Running testWithStopProcessor...."); - runner.setProperty(ListenUDP.PORT, "20000"); - runner.setProperty(ListenUDP.FLOW_FILE_SIZE_TRIGGER, "1 MB"); - runner.setProperty(ListenUDP.FLOW_FILES_PER_SESSION, "50"); - runner.setProperty(ListenUDP.CHANNEL_READER_PERIOD, "1 ms"); - runner.setProperty(ListenUDP.SENDING_HOST, "localhost"); - runner.setProperty(ListenUDP.SENDING_HOST_PORT, "20001"); - runner.setProperty(ListenUDP.RECV_TIMEOUT, "1 sec"); - Thread udpSender = new Thread(new DatagramSender(socket, false)); + public void testCustomValidation() { + runner.assertNotValid(); + runner.setProperty(ListenUDP.PORT, "1"); + runner.assertValid(); - ProcessContext context = runner.getProcessContext(); - ListenUDP processor = (ListenUDP) runner.getProcessor(); - ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - processor.initializeChannelListenerAndConsumerProcessing(context); - udpSender.start(); - boolean transferred = false; - long timeOut = System.currentTimeMillis() + 30000; - while (!transferred && System.currentTimeMillis() < timeOut) { - Thread.sleep(200); - processor.onTrigger(context, processSessionFactory); - transferred = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() > 0; - } - assertTrue("Didn't process the datagrams", transferred); - Thread.sleep(7000); - processor.stopping(); - processor.stopped(); - socket.close(); - assertTrue(runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() >= 60); + runner.setProperty(ListenUDP.SENDING_HOST, "localhost"); + runner.assertNotValid(); + + runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234"); + runner.assertValid(); + + runner.setProperty(ListenUDP.SENDING_HOST, ""); + runner.assertNotValid(); } @Test - public void testWithSlowRate() throws IOException, InterruptedException { - LOGGER.info("Running testWithSlowRate...."); - runner.setProperty(ListenUDP.PORT, "20000"); - runner.setProperty(ListenUDP.FLOW_FILE_SIZE_TRIGGER, "1 B"); - runner.setProperty(ListenUDP.FLOW_FILES_PER_SESSION, "1"); - runner.setProperty(ListenUDP.CHANNEL_READER_PERIOD, "50 ms"); - runner.setProperty(ListenUDP.MAX_BUFFER_SIZE, "2 MB"); - runner.setProperty(ListenUDP.SENDING_HOST, "localhost"); - runner.setProperty(ListenUDP.SENDING_HOST_PORT, "20001"); - runner.setProperty(ListenUDP.RECV_TIMEOUT, "5 sec"); - final DatagramSender sender = new DatagramSender(socket, false); - sender.throttle = 5000; - sender.numpackets = 10; - Thread udpSender = new Thread(sender); + public void testDefaultBehavior() throws IOException, InterruptedException { + final List messages = getMessages(15); + final int expectedQueued = messages.size(); + final int expectedTransferred = messages.size(); - ProcessContext context = runner.getProcessContext(); - ListenUDP processor = (ListenUDP) runner.getProcessor(); - ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - processor.initializeChannelListenerAndConsumerProcessing(context); - udpSender.start(); - boolean transferred = false; - long timeOut = System.currentTimeMillis() + 60000; - while (!transferred && System.currentTimeMillis() < timeOut) { - Thread.sleep(1000); - processor.onTrigger(context, processSessionFactory); - transferred = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() > 0; - } - assertTrue("Didn't process the datagrams", transferred); - Thread.sleep(7000); - processor.stopping(); - processor.stopped(); - socket.close(); - assertTrue(runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() >= 2); + // default behavior should produce a FlowFile per message sent + + run(new DatagramSocket(), messages, expectedQueued, expectedTransferred); + runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size()); + + List mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS); + verifyFlowFiles(mockFlowFiles); + verifyProvenance(expectedTransferred); } @Test - public void testWithCloseSenderAndNoStopping() throws Exception { - LOGGER.info("Running testWithCloseSenderAndNoStopping...."); - runner.setProperty(ListenUDP.PORT, "20000"); - runner.setProperty(ListenUDP.FLOW_FILE_SIZE_TRIGGER, "1 MB"); - runner.setProperty(ListenUDP.FLOW_FILES_PER_SESSION, "50"); - runner.setProperty(ListenUDP.CHANNEL_READER_PERIOD, "1 ms"); - runner.setProperty(ListenUDP.SENDING_HOST, "localhost"); - runner.setProperty(ListenUDP.SENDING_HOST_PORT, "20001"); - runner.setProperty(ListenUDP.RECV_TIMEOUT, "1 sec"); - Thread udpSender = new Thread(new DatagramSender(socket, false)); + public void testSendingMoreThanQueueSize() throws IOException, InterruptedException { + final int maxQueueSize = 3; + runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(maxQueueSize)); - ProcessContext context = runner.getProcessContext(); - ListenUDP processor = (ListenUDP) runner.getProcessor(); - ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - processor.initializeChannelListenerAndConsumerProcessing(context); - udpSender.start(); - int numTransfered = 0; - long timeout = System.currentTimeMillis() + 22000; - while (numTransfered <= 80 && System.currentTimeMillis() < timeout) { - Thread.sleep(200); - processor.onTrigger(context, processSessionFactory); - numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size(); - } - assertFalse("Did not process all the datagrams", numTransfered < 80); - processor.stopping(); - processor.stopped(); - socket.close(); + final List messages = getMessages(20); + final int expectedQueued = maxQueueSize; + final int expectedTransferred = maxQueueSize; + + run(new DatagramSocket(), messages, expectedQueued, expectedTransferred); + runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize); + + List mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS); + verifyFlowFiles(mockFlowFiles); + verifyProvenance(expectedTransferred); } - private static final class DatagramSender implements Runnable { + @Test + public void testBatchingSingleSender() throws IOException, InterruptedException { + final String delimiter = "NN"; + runner.setProperty(ListenUDP.MESSAGE_DELIMITER, delimiter); + runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3"); - private final DatagramSocket socket; - private final boolean closeSocket; - long throttle = 0; - int numpackets = 819200; + final List messages = getMessages(5); + final int expectedQueued = messages.size(); + final int expectedTransferred = 2; - private DatagramSender(DatagramSocket socket, boolean closeSocket) { - this.socket = socket; - this.closeSocket = closeSocket; + run(new DatagramSocket(), messages, expectedQueued, expectedTransferred); + runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, expectedTransferred); + + List mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS); + + MockFlowFile mockFlowFile1 = mockFlowFiles.get(0); + mockFlowFile1.assertContentEquals("This is message 1" + delimiter + "This is message 2" + delimiter + "This is message 3"); + + MockFlowFile mockFlowFile2 = mockFlowFiles.get(1); + mockFlowFile2.assertContentEquals("This is message 4" + delimiter + "This is message 5"); + + verifyProvenance(expectedTransferred); + } + + @Test + public void testBatchingWithDifferentSenders() throws IOException, InterruptedException { + final String sender1 = "sender1"; + final String sender2 = "sender2"; + final ChannelResponder responder = Mockito.mock(ChannelResponder.class); + final byte[] message = "test message".getBytes(StandardCharsets.UTF_8); + + final List mockEvents = new ArrayList<>(); + mockEvents.add(new StandardEvent(sender1, message, responder)); + mockEvents.add(new StandardEvent(sender1, message, responder)); + mockEvents.add(new StandardEvent(sender2, message, responder)); + mockEvents.add(new StandardEvent(sender2, message, responder)); + + MockListenUDP mockListenUDP = new MockListenUDP(mockEvents); + runner = TestRunners.newTestRunner(mockListenUDP); + runner.setProperty(ListenRELP.PORT, "1"); + runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); + + // sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders + + runner.run(); + runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2); + + verifyProvenance(2); + } + + @Test + public void testRunWhenNoEventsAvailale() throws IOException, InterruptedException { + final List mockEvents = new ArrayList<>(); + + MockListenUDP mockListenUDP = new MockListenUDP(mockEvents); + runner = TestRunners.newTestRunner(mockListenUDP); + runner.setProperty(ListenRELP.PORT, "1"); + runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); + + runner.run(5); + runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0); + } + + @Test + public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException { + final String sendingHost = "localhost"; + final Integer sendingPort = 21001; + runner.setProperty(ListenUDP.SENDING_HOST, sendingHost); + runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort)); + + // bind to the same sending port that processor has for Sending Host Port + final DatagramSocket socket = new DatagramSocket(sendingPort); + + final List messages = getMessages(6); + final int expectedQueued = messages.size(); + final int expectedTransferred = messages.size(); + + run(socket, messages, expectedQueued, expectedTransferred); + runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size()); + + List mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS); + verifyFlowFiles(mockFlowFiles); + verifyProvenance(expectedTransferred); + } + + @Test + public void testWithSendingHostAndPortDifferentThanSender() throws IOException, InterruptedException { + final String sendingHost = "localhost"; + final Integer sendingPort = 21001; + runner.setProperty(ListenUDP.SENDING_HOST, sendingHost); + runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort)); + + // bind to a different sending port than the processor has for Sending Host Port + final DatagramSocket socket = new DatagramSocket(21002); + + // no messages should come through since we are listening for 21001 and sending from 21002 + + final List messages = getMessages(6); + final int expectedQueued = 0; + final int expectedTransferred = 0; + + run(socket, messages, expectedQueued, expectedTransferred); + runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0); + } + + private List getMessages(int numMessages) { + final List messages = new ArrayList<>(); + for (int i=0; i < numMessages; i++) { + messages.add("This is message " + (i + 1)); + } + return messages; + } + + private void verifyFlowFiles(List mockFlowFiles) { + for (int i = 0; i < mockFlowFiles.size(); i++) { + MockFlowFile flowFile = mockFlowFiles.get(i); + flowFile.assertContentEquals("This is message " + (i + 1)); + Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenUDP.UDP_PORT_ATTR)); + Assert.assertTrue(StringUtils.isNotEmpty(flowFile.getAttribute(ListenUDP.UDP_SENDER_ATTR))); + } + } + + private void verifyProvenance(int expectedNumEvents) { + List provEvents = runner.getProvenanceEvents(); + Assert.assertEquals(expectedNumEvents, provEvents.size()); + + for (ProvenanceEventRecord event : provEvents) { + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue(event.getTransitUri().startsWith("udp://")); + } + } + + protected void run(final DatagramSocket socket, final List messages, final int expectedQueueSize, final int expectedTransferred) + throws IOException, InterruptedException { + + try { + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + Thread.sleep(100); + + // get the real port the dispatcher is listening on + final int destPort = proc.getDispatcherPort(); + final InetSocketAddress destination = new InetSocketAddress("localhost", destPort); + + // send the messages to the port the processors is listening on + for (final String message : messages) { + final byte[] buffer = message.getBytes(StandardCharsets.UTF_8); + final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination); + socket.send(packet); + Thread.sleep(10); + } + + long responseTimeout = 10000; + + // this first loop waits until the internal queue of the processor has the expected + // number of messages ready before proceeding, we want to guarantee they are all there + // before onTrigger gets a chance to run + long startTimeQueueSizeCheck = System.currentTimeMillis(); + while (proc.getQueueSize() < expectedQueueSize + && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) { + Thread.sleep(100); + } + + // want to fail here if the queue size isn't what we expect + Assert.assertEquals(expectedQueueSize, proc.getQueueSize()); + + // call onTrigger until we processed all the messages, or a certain amount of time passes + int numTransferred = 0; + long startTime = System.currentTimeMillis(); + while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < responseTimeout)) { + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS).size(); + Thread.sleep(100); + } + + // should have transferred the expected events + runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred); + } finally { + // unschedule to close connections + proc.onUnscheduled(); + IOUtils.closeQuietly(socket); + } + } + + // Extend ListenUDP to mock the ChannelDispatcher and allow us to return staged events + private static class MockListenUDP extends ListenUDP { + + private List mockEvents; + + public MockListenUDP(List mockEvents) { + this.mockEvents = mockEvents; + } + + @OnScheduled + @Override + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + events.addAll(mockEvents); } @Override - public void run() { - final byte[] buffer = new byte[128]; - try { - final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000)); - final long startTime = System.nanoTime(); - for (int i = 0; i < numpackets; i++) { // 100 MB - socket.send(packet); - if (throttle > 0) { - try { - Thread.sleep(throttle); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - final long endTime = System.nanoTime(); - final long durationMillis = (endTime - startTime) / 1000000; - LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis); - } catch (IOException e) { - LOGGER.error("", e); - } finally { - if (closeSocket) { - socket.close(); - LOGGER.info("Socket closed"); - } - } + protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue events) throws IOException { + return Mockito.mock(ChannelDispatcher.class); } + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java index 38b0572ef4..892fb8e1f6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard.relp.handler; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.response.ChannelResponder; @@ -45,6 +46,7 @@ public class TestRELPFrameHandler { private BlockingQueue events; private SelectionKey key; private AsyncChannelDispatcher dispatcher; + private ProcessorLog logger; private RELPFrameHandler frameHandler; @@ -55,8 +57,9 @@ public class TestRELPFrameHandler { this.events = new LinkedBlockingQueue<>(); this.key = Mockito.mock(SelectionKey.class); this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class); + this.logger = Mockito.mock(ProcessorLog.class); - this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java index e3516705cc..0bec3ed7c6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java @@ -127,7 +127,7 @@ public class TestRELPSocketChannelHandler { final ByteBuffer buffer = ByteBuffer.allocate(1024); try { // starts the dispatcher listening on port 0 so it selects a random port - dispatcher.open(0, 4096); + dispatcher.open(null, 0, 4096); // starts a thread to run the dispatcher which will accept/read connections Thread dispatcherThread = new Thread(dispatcher);