diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java index 824f2df990..a4308e319c 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java @@ -45,17 +45,19 @@ public final class ChannelDispatcher implements Runnable { private final StreamConsumerFactory factory; private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS); private final long timeout; + private final boolean readSingleDatagram; private volatile boolean stop = false; public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L; public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service, - final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) { + final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit, final boolean readSingleDatagram) { this.serverSocketSelector = serverSocketSelector; this.socketChannelSelector = socketChannelSelector; this.executor = service; this.factory = factory; emptyBuffers = buffers; this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + this.readSingleDatagram = readSingleDatagram; } public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) { @@ -136,7 +138,7 @@ public final class ChannelDispatcher implements Runnable { // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only // way to tell if it's new is the lack of an attachment. if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) { - reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory); + reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram); socketChannelKey.attach(reader); final ScheduledFuture readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS); diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java index 7cbf589a92..ab770636eb 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java @@ -75,14 +75,14 @@ public final class ChannelListener { private volatile long channelReaderFrequencyMSecs = 50; public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout, - TimeUnit unit) throws IOException { + TimeUnit unit, final boolean readSingleDatagram) throws IOException { this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread this.serverSocketSelector = Selector.open(); this.socketChannelSelector = Selector.open(); this.bufferPool = bufferPool; this.initialBufferPoolSize = bufferPool.size(); channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool, - timeout, unit); + timeout, unit, readSingleDatagram); executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS); } diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java index db76279921..a4670b97c9 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java @@ -27,8 +27,12 @@ public final class DatagramChannelReader extends AbstractChannelReader { public static final int MAX_UDP_PACKET_SIZE = 65507; - public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { + private final boolean readSingleDatagram; + + public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory, + final boolean readSingleDatagram) { super(id, key, empties, consumerFactory); + this.readSingleDatagram = readSingleDatagram; } /** @@ -45,7 +49,7 @@ public final class DatagramChannelReader extends AbstractChannelReader { final DatagramChannel dChannel = (DatagramChannel) key.channel(); final int initialBufferPosition = buffer.position(); while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) { - if (dChannel.receive(buffer) == null) { + if (dChannel.receive(buffer) == null || readSingleDatagram) { break; } } diff --git a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java index 27d5ccc7ee..a266ade5f6 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java @@ -52,7 +52,7 @@ public final class ServerMain { ChannelListener listener = null; try { executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS); - listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS); + listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS, false); listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS); listener.addDatagramChannel(null, 20000, 32 << 20); LOGGER.info("Listening for UDP data on port 20000"); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java index fa60d6b229..6a88bd4869 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java @@ -147,6 +147,14 @@ public class ListenUDP extends AbstractSessionFactoryProcessor { .required(true) .build(); + public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM = new PropertyDescriptor.Builder() + .name("FlowFile Per Datagram") + .description("Determines if this processor emits each datagram as a FlowFile, or if multiple datagrams can be placed in a single FlowFile.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Max Buffer Size") .description("Determines the size each receive buffer may be") @@ -273,6 +281,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor { props.add(RECV_BUFFER_COUNT); props.add(FLOW_FILES_PER_SESSION); props.add(RECV_TIMEOUT); + props.add(FLOW_FILE_PER_DATAGRAM); properties = Collections.unmodifiableList(props); } // defaults @@ -429,18 +438,19 @@ public class ListenUDP extends AbstractSessionFactoryProcessor { final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B); final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final boolean flowFilePerDatagram = context.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean(); final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() { @Override public StreamConsumer newInstance(final String streamId) { - final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger()); + final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger(), flowFilePerDatagram); consumerRef.set(consumer); return consumer; } }; final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE); - channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS); + channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS, flowFilePerDatagram); // specifying a sufficiently low number for each stream to be fast enough though very efficient channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS); InetAddress nicIPAddress = null; diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java index 617050949a..38f8add557 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java @@ -54,11 +54,12 @@ public class UDPStreamConsumer implements StreamConsumer { private ProcessSession session; private final UDPConsumerCallback udpCallback; - public UDPStreamConsumer(final String streamId, final List newFlowFiles, final long fileSizeTrigger, final ProcessorLog logger) { + public UDPStreamConsumer(final String streamId, final List newFlowFiles, final long fileSizeTrigger, final ProcessorLog logger, + final boolean flowFilePerDatagram) { this.uniqueId = streamId; this.newFlowFileQueue = newFlowFiles; this.logger = logger; - this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger); + this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger, flowFilePerDatagram); } @Override @@ -173,10 +174,12 @@ public class UDPStreamConsumer implements StreamConsumer { BufferPool bufferPool; final BlockingQueue filledBuffers; final long fileSizeTrigger; + final boolean flowFilePerDatagram; - public UDPConsumerCallback(final BlockingQueue filledBuffers, final long fileSizeTrigger) { + public UDPConsumerCallback(final BlockingQueue filledBuffers, final long fileSizeTrigger, final boolean flowFilePerDatagram) { this.filledBuffers = filledBuffers; this.fileSizeTrigger = fileSizeTrigger; + this.flowFilePerDatagram = flowFilePerDatagram; } public void setBufferPool(BufferPool pool) { @@ -196,7 +199,7 @@ public class UDPStreamConsumer implements StreamConsumer { bytesWrittenThisPass += wbc.write(buffer); } totalBytes += bytesWrittenThisPass; - if (totalBytes > fileSizeTrigger) { + if (totalBytes > fileSizeTrigger || flowFilePerDatagram) { break;// this is enough data } } finally {