NIFI-548 - Listen UDP should support generation of a flowfile per datagram

This commit is contained in:
bbende 2015-05-08 23:18:05 -04:00
parent c4d1666a26
commit badf1018c1
6 changed files with 32 additions and 13 deletions

View File

@ -45,17 +45,19 @@ public final class ChannelDispatcher implements Runnable {
private final StreamConsumerFactory factory;
private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
private final long timeout;
private final boolean readSingleDatagram;
private volatile boolean stop = false;
public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit, final boolean readSingleDatagram) {
this.serverSocketSelector = serverSocketSelector;
this.socketChannelSelector = socketChannelSelector;
this.executor = service;
this.factory = factory;
emptyBuffers = buffers;
this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
this.readSingleDatagram = readSingleDatagram;
}
public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
@ -136,7 +138,7 @@ public final class ChannelDispatcher implements Runnable {
// for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
// way to tell if it's new is the lack of an attachment.
if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram);
socketChannelKey.attach(reader);
final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
TimeUnit.MILLISECONDS);

View File

@ -75,14 +75,14 @@ public final class ChannelListener {
private volatile long channelReaderFrequencyMSecs = 50;
public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
TimeUnit unit) throws IOException {
TimeUnit unit, final boolean readSingleDatagram) throws IOException {
this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
this.serverSocketSelector = Selector.open();
this.socketChannelSelector = Selector.open();
this.bufferPool = bufferPool;
this.initialBufferPoolSize = bufferPool.size();
channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
timeout, unit);
timeout, unit, readSingleDatagram);
executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
}

View File

@ -27,8 +27,12 @@ public final class DatagramChannelReader extends AbstractChannelReader {
public static final int MAX_UDP_PACKET_SIZE = 65507;
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
private final boolean readSingleDatagram;
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory,
final boolean readSingleDatagram) {
super(id, key, empties, consumerFactory);
this.readSingleDatagram = readSingleDatagram;
}
/**
@ -45,7 +49,7 @@ public final class DatagramChannelReader extends AbstractChannelReader {
final DatagramChannel dChannel = (DatagramChannel) key.channel();
final int initialBufferPosition = buffer.position();
while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
if (dChannel.receive(buffer) == null) {
if (dChannel.receive(buffer) == null || readSingleDatagram) {
break;
}
}

View File

@ -52,7 +52,7 @@ public final class ServerMain {
ChannelListener listener = null;
try {
executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS);
listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS);
listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS, false);
listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS);
listener.addDatagramChannel(null, 20000, 32 << 20);
LOGGER.info("Listening for UDP data on port 20000");

View File

@ -147,6 +147,14 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
.required(true)
.build();
public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM = new PropertyDescriptor.Builder()
.name("FlowFile Per Datagram")
.description("Determines if this processor emits each datagram as a FlowFile, or if multiple datagrams can be placed in a single FlowFile.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size")
.description("Determines the size each receive buffer may be")
@ -273,6 +281,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
props.add(RECV_BUFFER_COUNT);
props.add(FLOW_FILES_PER_SESSION);
props.add(RECV_TIMEOUT);
props.add(FLOW_FILE_PER_DATAGRAM);
properties = Collections.unmodifiableList(props);
}
// defaults
@ -429,18 +438,19 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final boolean flowFilePerDatagram = context.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean();
final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() {
@Override
public StreamConsumer newInstance(final String streamId) {
final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger());
final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger(), flowFilePerDatagram);
consumerRef.set(consumer);
return consumer;
}
};
final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE);
channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS);
channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS, flowFilePerDatagram);
// specifying a sufficiently low number for each stream to be fast enough though very efficient
channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS);
InetAddress nicIPAddress = null;

View File

@ -54,11 +54,12 @@ public class UDPStreamConsumer implements StreamConsumer {
private ProcessSession session;
private final UDPConsumerCallback udpCallback;
public UDPStreamConsumer(final String streamId, final List<FlowFile> newFlowFiles, final long fileSizeTrigger, final ProcessorLog logger) {
public UDPStreamConsumer(final String streamId, final List<FlowFile> newFlowFiles, final long fileSizeTrigger, final ProcessorLog logger,
final boolean flowFilePerDatagram) {
this.uniqueId = streamId;
this.newFlowFileQueue = newFlowFiles;
this.logger = logger;
this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger);
this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger, flowFilePerDatagram);
}
@Override
@ -173,10 +174,12 @@ public class UDPStreamConsumer implements StreamConsumer {
BufferPool bufferPool;
final BlockingQueue<ByteBuffer> filledBuffers;
final long fileSizeTrigger;
final boolean flowFilePerDatagram;
public UDPConsumerCallback(final BlockingQueue<ByteBuffer> filledBuffers, final long fileSizeTrigger) {
public UDPConsumerCallback(final BlockingQueue<ByteBuffer> filledBuffers, final long fileSizeTrigger, final boolean flowFilePerDatagram) {
this.filledBuffers = filledBuffers;
this.fileSizeTrigger = fileSizeTrigger;
this.flowFilePerDatagram = flowFilePerDatagram;
}
public void setBufferPool(BufferPool pool) {
@ -196,7 +199,7 @@ public class UDPStreamConsumer implements StreamConsumer {
bytesWrittenThisPass += wbc.write(buffer);
}
totalBytes += bytesWrittenThisPass;
if (totalBytes > fileSizeTrigger) {
if (totalBytes > fileSizeTrigger || flowFilePerDatagram) {
break;// this is enough data
}
} finally {