NIFI-899 Rewrite of ListenUDP to use new listener framework, includes the following changes:

- Adding Network Interface property to AbstractListenEventProcessor and ListenSyslog
- Adding sending host and sending port to DatagramChannelDispatcher
- Creation of common base class AbstractListenEventBatchingProcessor
- Refactor of ListenUDP, ListenTCP, and ListenRELP to all extend from AbstractListenEventBatchingProcessor
- Changing DatagramChannelDispatcher, socket handlers, and RELP handler to use offer() when queueing instead of put(), and log an error if the offer failed

This closes #266
This commit is contained in:
Bryan Bende 2016-03-10 09:19:54 -05:00
parent 8f0116544a
commit 1373bf6725
23 changed files with 1048 additions and 976 deletions

View File

@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.event.Event;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* An abstract processor that extends from AbstractListenEventProcessor and adds common functionality for
* batching events into a single FlowFile.
*
* @param <E> the type of Event
*/
public abstract class AbstractListenEventBatchingProcessor<E extends Event> extends AbstractListenEventProcessor<E> {
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description(
"The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
.required(true)
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.displayName("Batching Message Delimiter")
.description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
.required(true)
.build();
// it is only the array reference that is volatile - not the contents.
protected volatile byte[] messageDemarcatorBytes;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(NETWORK_INTF_NAME);
descriptors.add(PORT);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
descriptors.add(MAX_BATCH_SIZE);
descriptors.add(MESSAGE_DELIMITER);
descriptors.addAll(getAdditionalProperties());
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.addAll(getAdditionalRelationships());
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
// if the size is 0 then there was nothing to process so return
// we don't need to yield here because we have a long poll in side of getBatches
if (batches.size() == 0) {
return;
}
final List<E> allEvents = new ArrayList<>();
for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<E> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
continue;
}
final Map<String,String> attributes = getAttributes(entry.getValue());
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// the sender and command will be the same for all events based on the batch key
final String transitUri = getTransitUri(entry.getValue());
session.getProvenanceReporter().receive(flowFile, transitUri);
allEvents.addAll(events);
}
// let sub-classes take any additional actions
postProcess(context, session, allEvents);
}
/**
* Creates the attributes for the FlowFile of the given batch.
*
* @param batch the current batch
* @return the Map of FlowFile attributes
*/
protected abstract Map<String,String> getAttributes(final FlowFileEventBatch batch);
/**
* Creates the transit uri to be used when reporting a provenance receive event for the given batch.
*
* @param batch the current batch
* @return the transit uri string
*/
protected abstract String getTransitUri(final FlowFileEventBatch batch);
/**
* Called at the end of onTrigger to allow sub-classes to take post processing action on the events
*
* @param context the current context
* @param session the current session
* @param events the list of all events processed by the current execution of onTrigger
*/
protected void postProcess(ProcessContext context, ProcessSession session, final List<E> events) {
// empty implementation so sub-classes only have to override if necessary
}
/**
* Batches together up to the batchSize events. Events are grouped together based on a batch key which
* by default is the sender of the event, but can be override by sub-classes.
*
* This method will return when batchSize has been reached, or when no more events are available on the queue.
*
* @param session the current session
* @param totalBatchSize the total number of events to process
* @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
*
* @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
* the batches will be <= batchSize
*/
protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
final byte[] messageDemarcatorBytes) {
final Map<String,FlowFileEventBatch> batches = new HashMap<>();
for (int i=0; i < totalBatchSize; i++) {
final E event = getMessage(true, true, session);
if (event == null) {
break;
}
final String batchKey = getBatchKey(event);
FlowFileEventBatch batch = batches.get(batchKey);
// if we don't have a batch for this key then create a new one
if (batch == null) {
batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
batches.put(batchKey, batch);
}
// add the current event to the batch
batch.getEvents().add(event);
// append the event's data to the FlowFile, write the demarcator first if not on the first event
final boolean writeDemarcator = (i > 0);
try {
final byte[] rawMessage = event.getData();
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
}
});
// update the FlowFile reference in the batch object
batch.setFlowFile(appendedFlowFile);
} catch (final Exception e) {
getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
new Object[] {e.getMessage()}, e);
errorEvents.offer(event);
break;
}
}
return batches;
}
/**
* @param event an event that was pulled off the queue
*
* @return a key to use for batching events together, by default this uses the sender of the
* event, but sub-classes should override this to batch by something else
*/
protected String getBatchKey(final E event) {
return event.getSender();
}
/**
* Wrapper to hold a FlowFile and the events that have been appended to it.
*/
protected final class FlowFileEventBatch {
private FlowFile flowFile;
private List<E> events;
public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) {
this.flowFile = flowFile;
this.events = events;
}
public FlowFile getFlowFile() {
return flowFile;
}
public List<E> getEvents() {
return events;
}
public void setFlowFile(FlowFile flowFile) {
this.flowFile = flowFile;
}
}
}

View File

@ -16,30 +16,31 @@
*/
package org.apache.nifi.processor.util.listen;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -55,6 +56,8 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor {
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port to listen on for communication.")
@ -105,23 +108,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
.defaultValue("2")
.required(true)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description(
"The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
.required(true)
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -130,8 +117,8 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
public static final int POLL_TIMEOUT_MS = 20;
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
protected Set<Relationship> relationships;
protected List<PropertyDescriptor> descriptors;
protected volatile int port;
protected volatile Charset charset;
@ -142,6 +129,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(NETWORK_INTF_NAME);
descriptors.add(PORT);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
@ -190,11 +178,18 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
port = context.getProperty(PORT).asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
InetAddress nicIPAddress = null;
if (!StringUtils.isEmpty(nicIPAddressStr)) {
NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
nicIPAddress = netIF.getInetAddresses().nextElement();
}
// create the dispatcher and call open() to bind to the given port
dispatcher = createDispatcher(context, events);
dispatcher.open(port, maxChannelBufferSize);
dispatcher.open(nicIPAddress, port, maxChannelBufferSize);
// start a thread to run the dispatcher
final Thread readerThread = new Thread(dispatcher);
@ -231,6 +226,21 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
}
}
/**
* Creates a pool of ByteBuffers with the given size.
*
* @param poolSize the number of buffers to initialize the pool with
* @param bufferSize the size of each buffer
* @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize
*/
protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) {
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
return bufferPool;
}
/**
* If pollErrorQueue is true, the error queue will be checked first and event will be
* returned from the error queue if available.
@ -271,104 +281,4 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
return event;
}
/**
* Batches together up to the batchSize events. Events are grouped together based on a batch key which
* by default is the sender of the event, but can be override by sub-classes.
*
* This method will return when batchSize has been reached, or when no more events are available on the queue.
*
* @param session the current session
* @param totalBatchSize the total number of events to process
* @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
*
* @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
* the batches will be <= batchSize
*/
protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
final byte[] messageDemarcatorBytes) {
final Map<String,FlowFileEventBatch> batches = new HashMap<>();
for (int i=0; i < totalBatchSize; i++) {
final E event = getMessage(true, true, session);
if (event == null) {
break;
}
final String batchKey = getBatchKey(event);
FlowFileEventBatch batch = batches.get(batchKey);
// if we don't have a batch for this key then create a new one
if (batch == null) {
batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
batches.put(batchKey, batch);
}
// add the current event to the batch
batch.getEvents().add(event);
// append the event's data to the FlowFile, write the demarcator first if not on the first event
final boolean writeDemarcator = (i > 0);
try {
final byte[] rawMessage = event.getData();
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
}
});
// update the FlowFile reference in the batch object
batch.setFlowFile(appendedFlowFile);
} catch (final Exception e) {
getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
new Object[] {e.getMessage()}, e);
errorEvents.offer(event);
break;
}
}
return batches;
}
/**
* @param event an event that was pulled off the queue
*
* @return a key to use for batching events together, by default this uses the sender of the
* event, but sub-classes should override this to batch by something else
*/
protected String getBatchKey(final E event) {
return event.getSender();
}
/**
* Wrapper to hold a FlowFile and the events that have been appended to it.
*/
protected final class FlowFileEventBatch {
private FlowFile flowFile;
private List<E> events;
public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) {
this.flowFile = flowFile;
this.events = events;
}
public FlowFile getFlowFile() {
return flowFile;
}
public List<E> getEvents() {
return events;
}
public void setFlowFile(FlowFile flowFile) {
this.flowFile = flowFile;
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
/**
* Shared properties.
*/
public class ListenerProperties {
private static final Set<String> interfaceSet = new HashSet<>();
static {
try {
final Enumeration<NetworkInterface> interfaceEnum = NetworkInterface.getNetworkInterfaces();
while (interfaceEnum.hasMoreElements()) {
final NetworkInterface ifc = interfaceEnum.nextElement();
interfaceSet.add(ifc.getName());
}
} catch (SocketException e) {
}
}
public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
.name("Local Network Interface")
.description("The name of a local network interface to be used to restrict listening to a specific LAN.")
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder()
.subject("Local Network Interface").valid(true).input(input).build();
if (interfaceSet.contains(input.toLowerCase())) {
return result;
}
String message;
String realValue = input;
try {
if (context.isExpressionLanguagePresent(input)) {
AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
realValue = ae.evaluate();
}
if (interfaceSet.contains(realValue.toLowerCase())) {
return result;
}
message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString();
} catch (IllegalArgumentException e) {
message = "Not a valid AttributeExpression: " + e.getMessage();
}
result = new ValidationResult.Builder().subject("Local Network Interface")
.valid(false).input(input).explanation(message).build();
return result;
}
})
.expressionLanguageSupported(true)
.build();
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processor.util.listen.dispatcher;
import java.io.IOException;
import java.net.InetAddress;
/**
* Dispatches handlers for a given channel.
@ -27,11 +28,16 @@ public interface ChannelDispatcher extends Runnable {
* Opens the dispatcher listening on the given port and attempts to set the
* OS socket buffer to maxBufferSize.
*
* @param nicAddress the local network interface to listen on, if null will listen on the wildcard address
* which means listening on all local network interfaces
*
* @param port the port to listen on
*
* @param maxBufferSize the size to set the OS socket buffer to
*
* @throws IOException if an error occurred listening on the given port
*/
void open(int port, int maxBufferSize) throws IOException;
void open(InetAddress nicAddress, int port, int maxBufferSize) throws IOException;
/**
* @return the port being listened to

View File

@ -21,8 +21,10 @@ import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
@ -42,8 +44,10 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
private final EventFactory<E> eventFactory;
private final BlockingQueue<ByteBuffer> bufferPool;
private final BlockingQueue<E> events;
private final EventQueue<E> events;
private final ProcessorLog logger;
private final String sendingHost;
private final Integer sendingPort;
private Selector selector;
private DatagramChannel datagramChannel;
@ -53,10 +57,21 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<E> events,
final ProcessorLog logger) {
this(eventFactory, bufferPool, events, logger, null, null);
}
public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<E> events,
final ProcessorLog logger,
final String sendingHost,
final Integer sendingPort) {
this.eventFactory = eventFactory;
this.bufferPool = bufferPool;
this.events = events;
this.logger = logger;
this.sendingHost = sendingHost;
this.sendingPort = sendingPort;
this.events = new EventQueue<>(events, logger);
if (bufferPool == null || bufferPool.size() == 0) {
throw new IllegalArgumentException("A pool of available ByteBuffers is required");
@ -64,7 +79,7 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
}
@Override
public void open(final int port, int maxBufferSize) throws IOException {
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
stopped = false;
datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
@ -78,7 +93,17 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
+ "maximum receive buffer");
}
}
datagramChannel.socket().bind(new InetSocketAddress(port));
// we don't have to worry about nicAddress being null here because InetSocketAddress already handles it
datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port));
// if a sending host and port were provided then connect to that specific address to only receive
// datagrams from that host/port, otherwise we can receive datagrams from any host/port
if (sendingHost != null && sendingPort != null) {
datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
}
selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
}
@ -115,9 +140,8 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender);
final E event = eventFactory.create(bytes, metadata, null);
events.offer(event);
// queue the raw message with the sender, block until space is available
events.put(event);
buffer.clear();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.security.util.SslContextFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
@ -104,7 +105,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
}
@Override
public void open(final int port, int maxBufferSize) throws IOException {
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
stopped = false;
executor = Executors.newFixedThreadPool(maxConnections);
@ -119,7 +120,9 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
+ "maximum receive buffer");
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port));
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.event;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.logging.ProcessorLog;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Wraps a BlockingQueue to centralize logic for offering events across UDP, TCP, and SSL.
*
* @param <E> the type of event
*/
public class EventQueue<E extends Event> {
/**
* The default number of milliseconds to wait when offering new events to the queue.
*/
public static final long DEFAULT_OFFER_WAIT_MS = 100;
private final long offerWaitMs;
private final BlockingQueue<E> events;
private final ProcessorLog logger;
public EventQueue(final BlockingQueue<E> events, final ProcessorLog logger) {
this(events, DEFAULT_OFFER_WAIT_MS, logger);
}
public EventQueue(final BlockingQueue<E> events, final long offerWaitMs, final ProcessorLog logger) {
this.events = events;
this.offerWaitMs = offerWaitMs;
this.logger = logger;
Validate.notNull(this.events);
Validate.notNull(this.logger);
}
/**
* Offers the given event to the events queue with a wait time, if the offer fails the event
* is dropped an error is logged.
*
* @param event the event to offer
* @throws InterruptedException if interrupted while waiting to offer
*/
public void offer(final E event) throws InterruptedException {
boolean queued = events.offer(event, offerWaitMs, TimeUnit.MILLISECONDS);
if (!queued) {
logger.error("Internal queue at maximum capacity, could not queue event");
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.event;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.util.Map;
/**
* EventFactory to create StandardEvent instances.
*/
public class StandardEventFactory implements EventFactory<StandardEvent> {
@Override
public StandardEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
String sender = null;
if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) {
sender = metadata.get(EventFactory.SENDER_KEY);
}
return new StandardEvent(sender, data, responder);
}
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import java.nio.channels.SelectionKey;
import java.nio.charset.Charset;
@ -34,7 +35,7 @@ public abstract class ChannelHandler<E extends Event, D extends ChannelDispatche
protected final D dispatcher;
protected final Charset charset;
protected final EventFactory<E> eventFactory;
protected final BlockingQueue<E> events;
protected final EventQueue<E> events;
protected final ProcessorLog logger;
@ -48,8 +49,8 @@ public abstract class ChannelHandler<E extends Event, D extends ChannelDispatche
this.dispatcher = dispatcher;
this.charset = charset;
this.eventFactory = eventFactory;
this.events = events;
this.logger = logger;
this.events = new EventQueue<E>(events, logger);
}
}

View File

@ -135,10 +135,8 @@ public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends Soc
if (currBytes.size() > 0) {
final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
// queue the raw event blocking until space is available, reset the temporary buffer
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.put(event);
events.offer(event);
currBytes.reset();
}
} else {

View File

@ -137,10 +137,8 @@ public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extend
if (currBytes.size() > 0) {
final SocketChannelResponder response = new SocketChannelResponder(socketChannel);
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.put(event);
events.offer(event);
currBytes.reset();
// Mark this as the start of the next message

View File

@ -24,14 +24,12 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
@ -56,7 +54,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "relp", "tcp", "logs"})
@ -72,7 +69,7 @@ import java.util.concurrent.LinkedBlockingQueue;
@WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain")
})
@SeeAlso({ParseSyslog.class})
public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
@ -83,11 +80,10 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
.build();
private volatile RELPEncoder relpEncoder;
private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents.
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(MAX_CONNECTIONS, MAX_BATCH_SIZE, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE);
return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE);
}
@Override
@ -96,9 +92,6 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
super.onScheduled(context);
// wanted to ensure charset was already populated here
relpEncoder = new RELPEncoder(charset);
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
}
@Override
@ -111,10 +104,7 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections);
for (int i = 0; i < maxConnections; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -129,69 +119,20 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
// if the size is 0 then there was nothing to process so return
// we don't need to yield here because inside getBatches() we are polling a queue with a wait
// and yielding here could have a negative impact on performance
if (batches.size() == 0) {
return;
}
for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<RELPEvent> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
continue;
}
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final String command = events.get(0).getCommand();
final int numAttributes = events.size() == 1 ? 5 : 4;
final Map<String,String> attributes = new HashMap<>(numAttributes);
attributes.put(RELPAttributes.COMMAND.key(), command);
attributes.put(RELPAttributes.SENDER.key(), sender);
attributes.put(RELPAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
// if there was only one event then we can pass on the transaction
// NOTE: we could pass on all the transaction ids joined together
if (events.size() == 1) {
attributes.put(RELPAttributes.TXNR.key(), String.valueOf(events.get(0).getTxnr()));
}
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// create a provenance receive event
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
.append(port).toString();
session.getProvenanceReporter().receive(flowFile, transitUri);
// commit the session to guarantee the data has been delivered
session.commit();
// respond to each event to acknowledge successful receipt
for (final RELPEvent event : events) {
respond(event, RELPResponse.ok(event.getTxnr()));
}
}
protected String getBatchKey(RELPEvent event) {
return event.getSender() + "_" + event.getCommand();
}
@Override
protected String getBatchKey(RELPEvent event) {
return event.getSender() + "_" + event.getCommand();
protected void postProcess(final ProcessContext context, final ProcessSession session, final List<RELPEvent> events) {
// first commit the session so we guarantee we have all the events successfully
// written to FlowFiles and transferred to the success relationship
session.commit();
// respond to each event to acknowledge successful receipt
for (final RELPEvent event : events) {
respond(event, RELPResponse.ok(event.getTxnr()));
}
}
protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
@ -207,6 +148,39 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
}
}
@Override
protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
final List<RELPEvent> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final String command = events.get(0).getCommand();
final int numAttributes = events.size() == 1 ? 5 : 4;
final Map<String,String> attributes = new HashMap<>(numAttributes);
attributes.put(RELPAttributes.COMMAND.key(), command);
attributes.put(RELPAttributes.SENDER.key(), sender);
attributes.put(RELPAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
// if there was only one event then we can pass on the transaction
// NOTE: we could pass on all the transaction ids joined together
if (events.size() == 1) {
attributes.put(RELPAttributes.TXNR.key(), String.valueOf(events.get(0).getTxnr()));
}
return attributes;
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
public enum RELPAttributes implements FlowFileAttributeKey {
TXNR("relp.txnr"),
COMMAND("relp.command"),

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -55,6 +58,8 @@ import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
@ -189,6 +194,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROTOCOL);
descriptors.add(PORT);
descriptors.add(NETWORK_INTF_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
@ -257,6 +263,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final String charSet = context.getProperty(CHARSET).getValue();
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charSet));
@ -276,10 +283,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
parser = new SyslogParser(Charset.forName(charSet));
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
InetAddress nicIPAddress = null;
if (!StringUtils.isEmpty(nicIPAddressStr)) {
NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
nicIPAddress = netIF.getInetAddresses().nextElement();
}
// create either a UDP or TCP reader and call open() to bind to the given port
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
channelDispatcher = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
channelDispatcher.open(port, maxChannelBufferSize);
channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelDispatcher);
readerThread.setName("ListenSyslog [" + getIdentifier() + "]");

View File

@ -23,31 +23,26 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
@ -57,7 +52,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@ -71,7 +65,7 @@ import java.util.concurrent.LinkedBlockingQueue;
@WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."),
@WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.")
})
public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent> {
public class ListenTCP extends AbstractListenEventBatchingProcessor<StandardEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
@ -89,15 +83,10 @@ public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent>
.defaultValue(SSLContextService.ClientAuth.REQUIRED.name())
.build();
// it is only the array reference that is volatile - not the contents.
private volatile byte[] messageDemarcatorBytes;
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
MAX_BATCH_SIZE,
MESSAGE_DELIMITER,
SSL_CONTEXT_SERVICE,
CLIENT_AUTH
);
@ -120,15 +109,7 @@ public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent>
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<TCPEvent> events)
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
throws IOException {
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
@ -136,12 +117,7 @@ public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent>
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections);
for (int i = 0; i < maxConnections; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
final EventFactory<TCPEvent> eventFactory = new TCPEventFactory();
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -154,73 +130,27 @@ public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent>
clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
}
final ChannelHandlerFactory<TCPEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
final ChannelHandlerFactory<StandardEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
// if the size is 0 then there was nothing to process so return
// we don't need to yield here because we have a long poll in side of getBatches
if (batches.size() == 0) {
return;
}
for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<TCPEvent> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
continue;
}
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
attributes.put("tcp.sender", sender);
attributes.put("tcp.port", String.valueOf(port));
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// create a provenance receive event
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
.append(port).toString();
session.getProvenanceReporter().receive(flowFile, transitUri);
}
protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
attributes.put("tcp.sender", sender);
attributes.put("tcp.port", String.valueOf(port));
return attributes;
}
/**
* Event implementation for TCP.
*/
static class TCPEvent<C extends SelectableChannel> extends StandardEvent<C> {
public TCPEvent(String sender, byte[] data, ChannelResponder<C> responder) {
super(sender, data, responder);
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
/**
* Factory implementation for TCPEvents.
*/
static final class TCPEventFactory implements EventFactory<TCPEvent> {
@Override
public TCPEvent create(byte[] data, Map<String, String> metadata, ChannelResponder responder) {
String sender = null;
if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) {
sender = metadata.get(EventFactory.SENDER_KEY);
}
return new TCPEvent(sender, data, responder);
}
}
}

View File

@ -16,198 +16,54 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.io.nio.ChannelListener;
import org.apache.nifi.io.nio.consumer.StreamConsumer;
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.UDPStreamConsumer;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
/**
* <p>
* This processor listens for Datagram Packets on a given port and concatenates the contents of those packets together generating flow files roughly as often as the internal buffer fills up or until
* no more data is currently available.
* </p>
*
* <p>
* This processor has the following required properties:
* <ul>
* <li><b>Port</b> - The port to listen on for data packets. Must be known by senders of Datagrams.</li>
* <li><b>Receive Timeout</b> - The time out period when waiting to receive data from the socket. Specify units. Default is 5 secs.</li>
* <li><b>Max Buffer Size</b> - Determines the size each receive buffer may be. Specify units. Default is 1 MB.</li>
* <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size at which a flow file would be generated. A flow file will get made even if this value isn't reached if there is no more
* data streaming in and this value may be exceeded by the size of a single packet. Specify units. Default is 1 MB.</li>
* <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should be used. This is a suggestion to the Operating System to indicate how big the udp socket buffer should be. Specify units.
* Default is 1 MB.")</li>
* <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to accept data from the socket. Higher numbers means more ram is allocated but can allow better throughput. Default is
* 4.</li>
* <li><b>Channel Reader Interval</b> - Scheduling interval for each read channel. Specify units. Default is 50 millisecs.</li>
* <li><b>FlowFiles Per Session</b> - The number of flow files per session. Higher number is more efficient, but will lose more data if a problem occurs that causes a rollback of a session. Default is
* 10</li>
* </ul>
* </p>
*
* This processor has the following optional properties:
* <ul>
* <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an
* environment variable.</li>
* <li><b>Sending Host Port</b> - Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system
* property or an environment variable.</li>
* </ul>
*
* <p>
* The following relationships are required:
* <ul>
* <li><b>success</b> - Where to route newly created flow files.</li>
* </ul>
* </p>
*
*/
@TriggerWhenEmpty
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@SupportsBatching
@Tags({"ingest", "udp", "listen", "source"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Listens for Datagram Packets on a given port and concatenates the contents of those packets "
+ "together generating flow files")
public class ListenUDP extends AbstractSessionFactoryProcessor {
@CapabilityDescription("Listens for Datagram Packets on a given port. The default behavior produces a FlowFile " +
"per datagram, however for higher throughput the Max Batch Size property may be increased to specify the number of " +
"datagrams to batch together in a single FlowFile. This processor can be restricted to listening for datagrams from a " +
"specific remote host and port by specifying the Sending Host and Sending Host Port properties, otherwise it will listen " +
"for datagrams from all hosts and ports.")
@WritesAttributes({
@WritesAttribute(attribute="udp.sender", description="The sending host of the messages."),
@WritesAttribute(attribute="udp.port", description="The sending port the messages were received.")
})
public class ListenUDP extends AbstractListenEventBatchingProcessor<StandardEvent> {
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> properties;
// relationships.
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("success")
.description("Connection which contains concatenated Datagram Packets")
.build();
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(RELATIONSHIP_SUCCESS);
relationships = Collections.unmodifiableSet(rels);
}
// required properties.
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("Port to listen on. Must be known by senders of Datagrams.")
.addValidator(StandardValidators.PORT_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor RECV_TIMEOUT = new PropertyDescriptor.Builder()
.name("Receive Timeout")
.description("The time out period when waiting to receive data from the socket. Specify units.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 secs")
.required(true)
.build();
public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM = new PropertyDescriptor.Builder()
.name("FlowFile Per Datagram")
.description("Determines if this processor emits each datagram as a FlowFile, or if multiple datagrams can be placed in a single FlowFile.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size")
.description("Determines the size each receive buffer may be")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder()
.name("FlowFile Size Trigger")
.description("Determines the (almost) upper bound size at which a flow file would be generated.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor MAX_UDP_BUFFER = new PropertyDescriptor.Builder()
.name("Max size of UDP Buffer")
.description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System "
+ "to indicate how big the udp socket buffer should be.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor RECV_BUFFER_COUNT = new PropertyDescriptor.Builder()
.name("Receive Buffer Count")
.description("Number of receiving buffers to be used to accept data from the socket. Higher numbers "
+ "means more ram is allocated but can allow better throughput.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("4")
.required(true)
.build();
public static final PropertyDescriptor CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder()
.name("Channel Reader Interval")
.description("Scheduling interval for each read channel.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("50 ms")
.required(true)
.build();
public static final PropertyDescriptor FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder()
.name("FlowFiles Per Session")
.description("The number of flow files per session.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10")
.build();
// optional properties.
public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
.name("Sending Host")
.description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
@ -224,359 +80,70 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(true)
.build();
private static final Set<String> interfaceSet = new HashSet<>();
static {
try {
final Enumeration<NetworkInterface> interfaceEnum = NetworkInterface.getNetworkInterfaces();
while (interfaceEnum.hasMoreElements()) {
final NetworkInterface ifc = interfaceEnum.nextElement();
interfaceSet.add(ifc.getName());
}
} catch (SocketException e) {
}
}
public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
.name("Local Network Interface")
.description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
+ "May be a system property or an environment variable.")
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder()
.subject("Local Network Interface").valid(true).input(input).build();
if (interfaceSet.contains(input.toLowerCase())) {
return result;
}
String message;
try {
AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
String realValue = ae.evaluate();
if (interfaceSet.contains(realValue.toLowerCase())) {
return result;
}
message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString();
} catch (IllegalArgumentException e) {
message = "Not a valid AttributeExpression: " + e.getMessage();
}
result = new ValidationResult.Builder().subject("Local Network Interface")
.valid(false).input(input).explanation(message).build();
return result;
}
})
.expressionLanguageSupported(true).build();
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(SENDING_HOST);
props.add(SENDING_HOST_PORT);
props.add(NETWORK_INTF_NAME);
props.add(CHANNEL_READER_PERIOD);
props.add(FLOW_FILE_SIZE_TRIGGER);
props.add(MAX_BUFFER_SIZE);
props.add(MAX_UDP_BUFFER);
props.add(PORT);
props.add(RECV_BUFFER_COUNT);
props.add(FLOW_FILES_PER_SESSION);
props.add(RECV_TIMEOUT);
props.add(FLOW_FILE_PER_DATAGRAM);
properties = Collections.unmodifiableList(props);
}
// defaults
public static final int DEFAULT_LISTENING_THREADS = 2;
// lock used to protect channelListener
private final Lock lock = new ReentrantLock();
private volatile ChannelListener channelListener = null;
private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> flowFilesPerSessionQueue = new LinkedBlockingQueue<>();
private final List<FlowFile> newFlowFiles = new ArrayList<>();
private final AtomicReference<UDPStreamConsumer> consumerRef = new AtomicReference<>();
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>();
private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor();
private final AtomicReference<Future<Tuple<ProcessSession, List<FlowFile>>>> consumerFutureRef = new AtomicReference<>();
private final AtomicBoolean resetChannelListener = new AtomicBoolean(false);
// instance attribute for provenance receive event generation
private volatile String sendingHost;
public static final String UDP_PORT_ATTR = "udp.port";
public static final String UDP_SENDER_ATTR = "udp.sender";
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
/**
* Create the ChannelListener and a thread that causes the Consumer to create flow files.
*
* @param context context
* @throws IOException ex
*/
@OnScheduled
public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException {
getChannelListener(context);
stopping.set(false);
Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService.submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
@Override
public Tuple<ProcessSession, List<FlowFile>> call() {
final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
// number of waits in 5 secs, or 1
final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1);
final ProcessorLog logger = getLogger();
int flowFileCount = maxFlowFilesPerSession;
ProcessSession session = null;
int numWaits = 0;
while (!stopping.get()) {
UDPStreamConsumer consumer = consumerRef.get();
if (consumer == null || sessionFactoryRef.get() == null) {
try {
Thread.sleep(100L);
} catch (InterruptedException swallow) {
}
} else {
try {
// first time through, flowFileCount is maxFlowFilesPerSession so that a session
// is created and the consumer is updated with it.
if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) {
logger.debug("Have waited {} times", new Object[]{numWaits});
numWaits = 0;
if (session != null) {
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
newFlowFiles.clear();
flowFilesPerSessionQueue.add(flowFilesPerSession);
}
session = sessionFactoryRef.get().createSession();
consumer.setSession(session);
flowFileCount = 0;
}
// this will throttle the processing of the received datagrams. If there are no more
// buffers to read into because none have been returned to the pool via consumer.process(),
// then the desired back pressure on the channel is created.
if (context.getAvailableRelationships().size() > 0) {
consumer.process();
if (flowFileCount == newFlowFiles.size()) {
// no new datagrams received, need to throttle this thread back so it does
// not consume all cpu...but don't want to cause back pressure on the channel
// so the sleep time is same as the reader interval
// If have done this for approx. 5 secs, assume datagram sender is down. So, push
// out the remaining flow files (see numWaits == maxWaits above)
Thread.sleep(channelReaderIntervalMSecs);
if (flowFileCount > 0) {
numWaits++;
}
} else {
flowFileCount = newFlowFiles.size();
}
} else {
logger.debug("Creating back pressure...no available destinations");
Thread.sleep(1000L);
}
} catch (final IOException ioe) {
logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe);
} catch (InterruptedException e) {
// don't care
} finally {
if (consumer.isConsumerFinished()) {
logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
consumerRef.set(null);
disconnect();
if (!stopping.get()) {
resetChannelListener.set(true);
}
}
}
}
}
// when shutting down, need consumer to drain rest of cached buffers and clean up.
// prior to getting here, the channelListener was shutdown
UDPStreamConsumer consumer;
while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) {
try {
consumer.process();
} catch (IOException swallow) {
// if this is blown...consumer.isConsumerFinished will be true
}
}
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
return flowFilesPerSession;
}
});
consumerFutureRef.set(consumerFuture);
}
private void disconnect() {
if (lock.tryLock()) {
try {
if (channelListener != null) {
getLogger().debug("Shutting down channel listener {}", new Object[]{channelListener});
channelListener.shutdown(500L, TimeUnit.MILLISECONDS);
channelListener = null;
}
} finally {
lock.unlock();
}
}
}
private void getChannelListener(final ProcessContext context) throws IOException {
if (lock.tryLock()) {
try {
ProcessorLog logger = getLogger();
logger.debug("Instantiating a new channel listener");
final int port = context.getProperty(PORT).asInteger();
final int bufferCount = context.getProperty(RECV_BUFFER_COUNT).asInteger();
final Double bufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B);
final Double rcvBufferSize = context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B);
sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final boolean flowFilePerDatagram = context.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean();
final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() {
@Override
public StreamConsumer newInstance(final String streamId) {
final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger(), flowFilePerDatagram);
consumerRef.set(consumer);
return consumer;
}
};
final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE);
channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS, flowFilePerDatagram);
// specifying a sufficiently low number for each stream to be fast enough though very efficient
channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS);
InetAddress nicIPAddress = null;
if (null != nicIPAddressStr) {
NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
nicIPAddress = netIF.getInetAddresses().nextElement();
}
channelListener.addDatagramChannel(nicIPAddress, port, rcvBufferSize.intValue(), sendingHost, sendingHostPort);
logger.info("Registered service and initialized UDP socket listener. Now listening on port " + port + "...");
} finally {
lock.unlock();
}
}
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
SENDING_HOST,
SENDING_HOST_PORT
);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Collection<ValidationResult> result = new ArrayList<>();
String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
final Collection<ValidationResult> result = new ArrayList<>();
final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
result.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false)
.explanation("Must specify Sending Host when specifying Sending Host Port").build());
result.add(
new ValidationResult.Builder()
.subject(SENDING_HOST.getName())
.valid(false)
.explanation("Must specify Sending Host when specifying Sending Host Port")
.build());
} else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
result.add(
new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
new ValidationResult.Builder()
.subject(SENDING_HOST_PORT.getName())
.valid(false)
.explanation("Must specify Sending Host Port when specifying Sending Host")
.build());
}
return result;
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessorLog logger = getLogger();
sessionFactoryRef.compareAndSet(null, sessionFactory);
if (resetChannelListener.getAndSet(false) && !stopping.get()) {
try {
getChannelListener(context);
} catch (IOException e) {
logger.error("Tried to reset Channel Listener and failed due to:", e);
resetChannelListener.set(true);
}
}
transferFlowFiles();
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
throws IOException {
final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort);
}
private boolean transferFlowFiles() {
final ProcessorLog logger = getLogger();
ProcessSession session;
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = null;
boolean transferred = false;
try {
flowFilesPerSession = flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (flowFilesPerSession != null) {
session = flowFilesPerSession.getKey();
List<FlowFile> flowFiles = flowFilesPerSession.getValue();
String sourceSystem = sendingHost == null ? "Unknown" : sendingHost;
try {
for (FlowFile flowFile : flowFiles) {
session.getProvenanceReporter().receive(flowFile, sourceSystem);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
}
logger.info("Transferred flow files {} to success", new Object[]{flowFiles});
transferred = true;
// need to check for erroneous flow files in input queue
List<FlowFile> existingFlowFiles = session.get(10);
for (FlowFile existingFlowFile : existingFlowFiles) {
if (existingFlowFile != null && existingFlowFile.getSize() > 0) {
session.transfer(existingFlowFile, RELATIONSHIP_SUCCESS);
logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success",
new Object[]{existingFlowFile});
} else if (existingFlowFile != null) {
session.remove(existingFlowFile);
logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}",
new Object[]{existingFlowFile});
}
}
session.commit();
} catch (Throwable t) {
session.rollback();
logger.error("Failed to transfer flow files or commit session...rolled back", t);
throw t;
}
}
return transferred;
@Override
protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
attributes.put(UDP_SENDER_ATTR, sender);
attributes.put(UDP_PORT_ATTR, String.valueOf(port));
return attributes;
}
@OnUnscheduled
public void stopping() {
getLogger().debug("Stopping Processor");
disconnect();
stopping.set(true);
Future<Tuple<ProcessSession, List<FlowFile>>> future;
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession;
if ((future = consumerFutureRef.getAndSet(null)) != null) {
try {
flowFilesPerSession = future.get();
if (flowFilesPerSession.getValue().size() > 0) {
getLogger().debug("Draining remaining flow Files when stopping");
flowFilesPerSessionQueue.add(flowFilesPerSession);
} else {
// need to close out the session that has no flow files
flowFilesPerSession.getKey().commit();
}
} catch (InterruptedException | ExecutionException e) {
getLogger().error("Failure in cleaning up!", e);
}
boolean moreFiles = true;
while (moreFiles) {
try {
moreFiles = transferFlowFiles();
} catch (Throwable t) {
getLogger().error("Problem transferring cached flowfiles", t);
}
}
}
}
@OnStopped
public void stopped() {
sessionFactoryRef.set(null);
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
public static class HostValidator implements Validator {

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
@ -45,21 +47,24 @@ public class RELPFrameHandler<E extends Event<SocketChannel>> {
private final Charset charset;
private final EventFactory<E> eventFactory;
private final BlockingQueue<E> events;
private final EventQueue<E> events;
private final SelectionKey key;
private final AsyncChannelDispatcher dispatcher;
private final ProcessorLog logger;
private final RELPEncoder encoder;
public RELPFrameHandler(final SelectionKey selectionKey,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final AsyncChannelDispatcher dispatcher) {
final AsyncChannelDispatcher dispatcher,
final ProcessorLog logger) {
this.key = selectionKey;
this.charset = charset;
this.eventFactory = eventFactory;
this.events = events;
this.dispatcher = dispatcher;
this.logger = logger;
this.events = new EventQueue<>(events, logger);
this.encoder = new RELPEncoder(charset);
}
@ -82,9 +87,8 @@ public class RELPFrameHandler<E extends Event<SocketChannel>> {
metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr()));
metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand());
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(frame.getData(), metadata, responder);
events.put(event);
events.offer(event);
}
}

View File

@ -50,7 +50,7 @@ public class RELPSSLSocketChannelHandler<E extends Event<SocketChannel>> extends
final ProcessorLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new RELPDecoder(charset);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override

View File

@ -50,7 +50,7 @@ public class RELPSocketChannelHandler<E extends Event<SocketChannel>> extends St
final ProcessorLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new RELPDecoder(charset);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override

View File

@ -18,8 +18,11 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
@ -35,13 +38,16 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
public class TestListenRELP {
@ -78,7 +84,7 @@ public class TestListenRELP {
encoder = new RELPEncoder(StandardCharsets.UTF_8);
proc = new ResponseCapturingListenRELP();
runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PORT, "0");
runner.setProperty(ListenRELP.PORT, "0");
}
@Test
@ -169,6 +175,38 @@ public class TestListenRELP {
run(frames, 5, 5, sslContextService);
}
@Test
public void testNoEventsAvailable() throws IOException, InterruptedException {
MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<RELPEvent>());
runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(ListenRELP.PORT, "1");
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
}
@Test
public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
final String sender1 = "sender1";
final String sender2 = "sender2";
final ChannelResponder<SocketChannel> responder = Mockito.mock(ChannelResponder.class);
final List<RELPEvent> mockEvents = new ArrayList<>();
mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(ListenRELP.PORT, "1");
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
}
protected void run(final List<RELPFrame> frames, final int expectedTransferred, final int expectedResponses, final SSLContextService sslContextService)
throws IOException, InterruptedException {
@ -250,4 +288,27 @@ public class TestListenRELP {
}
}
// Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events
private static class MockListenRELP extends ListenRELP {
private List<RELPEvent> mockEvents;
public MockListenRELP(List<RELPEvent> mockEvents) {
this.mockEvents = mockEvents;
}
@OnScheduled
@Override
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
events.addAll(mockEvents);
}
@Override
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) throws IOException {
return Mockito.mock(ChannelDispatcher.class);
}
}
}

View File

@ -95,7 +95,7 @@ public class TestListenSyslog {
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
proc.onTrigger(context, processSessionFactory);
numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size();
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);

View File

@ -16,34 +16,40 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Ignore
public class TestListenUDP {
private int port = 0;
private ListenUDP proc;
private TestRunner runner;
private static Logger LOGGER;
private DatagramSocket socket;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -52,7 +58,6 @@ public class TestListenUDP {
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug");
LOGGER = LoggerFactory.getLogger(TestListenUDP.class);
}
@AfterClass
@ -62,152 +67,267 @@ public class TestListenUDP {
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(ListenUDP.class);
socket = new DatagramSocket(20001);
}
@After
public void tearDown() throws Exception {
socket.close();
proc = new ListenUDP();
runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenUDP.PORT, String.valueOf(port));
}
@Test
public void testWithStopProcessor() throws IOException, InterruptedException {
LOGGER.info("Running testWithStopProcessor....");
runner.setProperty(ListenUDP.PORT, "20000");
runner.setProperty(ListenUDP.FLOW_FILE_SIZE_TRIGGER, "1 MB");
runner.setProperty(ListenUDP.FLOW_FILES_PER_SESSION, "50");
runner.setProperty(ListenUDP.CHANNEL_READER_PERIOD, "1 ms");
runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
runner.setProperty(ListenUDP.SENDING_HOST_PORT, "20001");
runner.setProperty(ListenUDP.RECV_TIMEOUT, "1 sec");
Thread udpSender = new Thread(new DatagramSender(socket, false));
public void testCustomValidation() {
runner.assertNotValid();
runner.setProperty(ListenUDP.PORT, "1");
runner.assertValid();
ProcessContext context = runner.getProcessContext();
ListenUDP processor = (ListenUDP) runner.getProcessor();
ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
processor.initializeChannelListenerAndConsumerProcessing(context);
udpSender.start();
boolean transferred = false;
long timeOut = System.currentTimeMillis() + 30000;
while (!transferred && System.currentTimeMillis() < timeOut) {
Thread.sleep(200);
processor.onTrigger(context, processSessionFactory);
transferred = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() > 0;
}
assertTrue("Didn't process the datagrams", transferred);
Thread.sleep(7000);
processor.stopping();
processor.stopped();
socket.close();
assertTrue(runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() >= 60);
runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
runner.assertNotValid();
runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
runner.assertValid();
runner.setProperty(ListenUDP.SENDING_HOST, "");
runner.assertNotValid();
}
@Test
public void testWithSlowRate() throws IOException, InterruptedException {
LOGGER.info("Running testWithSlowRate....");
runner.setProperty(ListenUDP.PORT, "20000");
runner.setProperty(ListenUDP.FLOW_FILE_SIZE_TRIGGER, "1 B");
runner.setProperty(ListenUDP.FLOW_FILES_PER_SESSION, "1");
runner.setProperty(ListenUDP.CHANNEL_READER_PERIOD, "50 ms");
runner.setProperty(ListenUDP.MAX_BUFFER_SIZE, "2 MB");
runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
runner.setProperty(ListenUDP.SENDING_HOST_PORT, "20001");
runner.setProperty(ListenUDP.RECV_TIMEOUT, "5 sec");
final DatagramSender sender = new DatagramSender(socket, false);
sender.throttle = 5000;
sender.numpackets = 10;
Thread udpSender = new Thread(sender);
public void testDefaultBehavior() throws IOException, InterruptedException {
final List<String> messages = getMessages(15);
final int expectedQueued = messages.size();
final int expectedTransferred = messages.size();
ProcessContext context = runner.getProcessContext();
ListenUDP processor = (ListenUDP) runner.getProcessor();
ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
processor.initializeChannelListenerAndConsumerProcessing(context);
udpSender.start();
boolean transferred = false;
long timeOut = System.currentTimeMillis() + 60000;
while (!transferred && System.currentTimeMillis() < timeOut) {
Thread.sleep(1000);
processor.onTrigger(context, processSessionFactory);
transferred = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() > 0;
}
assertTrue("Didn't process the datagrams", transferred);
Thread.sleep(7000);
processor.stopping();
processor.stopped();
socket.close();
assertTrue(runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() >= 2);
// default behavior should produce a FlowFile per message sent
run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
verifyFlowFiles(mockFlowFiles);
verifyProvenance(expectedTransferred);
}
@Test
public void testWithCloseSenderAndNoStopping() throws Exception {
LOGGER.info("Running testWithCloseSenderAndNoStopping....");
runner.setProperty(ListenUDP.PORT, "20000");
runner.setProperty(ListenUDP.FLOW_FILE_SIZE_TRIGGER, "1 MB");
runner.setProperty(ListenUDP.FLOW_FILES_PER_SESSION, "50");
runner.setProperty(ListenUDP.CHANNEL_READER_PERIOD, "1 ms");
runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
runner.setProperty(ListenUDP.SENDING_HOST_PORT, "20001");
runner.setProperty(ListenUDP.RECV_TIMEOUT, "1 sec");
Thread udpSender = new Thread(new DatagramSender(socket, false));
public void testSendingMoreThanQueueSize() throws IOException, InterruptedException {
final int maxQueueSize = 3;
runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(maxQueueSize));
ProcessContext context = runner.getProcessContext();
ListenUDP processor = (ListenUDP) runner.getProcessor();
ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
processor.initializeChannelListenerAndConsumerProcessing(context);
udpSender.start();
int numTransfered = 0;
long timeout = System.currentTimeMillis() + 22000;
while (numTransfered <= 80 && System.currentTimeMillis() < timeout) {
Thread.sleep(200);
processor.onTrigger(context, processSessionFactory);
numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size();
}
assertFalse("Did not process all the datagrams", numTransfered < 80);
processor.stopping();
processor.stopped();
socket.close();
final List<String> messages = getMessages(20);
final int expectedQueued = maxQueueSize;
final int expectedTransferred = maxQueueSize;
run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
verifyFlowFiles(mockFlowFiles);
verifyProvenance(expectedTransferred);
}
private static final class DatagramSender implements Runnable {
@Test
public void testBatchingSingleSender() throws IOException, InterruptedException {
final String delimiter = "NN";
runner.setProperty(ListenUDP.MESSAGE_DELIMITER, delimiter);
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
private final DatagramSocket socket;
private final boolean closeSocket;
long throttle = 0;
int numpackets = 819200;
final List<String> messages = getMessages(5);
final int expectedQueued = messages.size();
final int expectedTransferred = 2;
private DatagramSender(DatagramSocket socket, boolean closeSocket) {
this.socket = socket;
this.closeSocket = closeSocket;
run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, expectedTransferred);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
MockFlowFile mockFlowFile1 = mockFlowFiles.get(0);
mockFlowFile1.assertContentEquals("This is message 1" + delimiter + "This is message 2" + delimiter + "This is message 3");
MockFlowFile mockFlowFile2 = mockFlowFiles.get(1);
mockFlowFile2.assertContentEquals("This is message 4" + delimiter + "This is message 5");
verifyProvenance(expectedTransferred);
}
@Test
public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
final String sender1 = "sender1";
final String sender2 = "sender2";
final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
final List<StandardEvent> mockEvents = new ArrayList<>();
mockEvents.add(new StandardEvent(sender1, message, responder));
mockEvents.add(new StandardEvent(sender1, message, responder));
mockEvents.add(new StandardEvent(sender2, message, responder));
mockEvents.add(new StandardEvent(sender2, message, responder));
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
runner.setProperty(ListenRELP.PORT, "1");
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
// sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
verifyProvenance(2);
}
@Test
public void testRunWhenNoEventsAvailale() throws IOException, InterruptedException {
final List<StandardEvent> mockEvents = new ArrayList<>();
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
runner.setProperty(ListenRELP.PORT, "1");
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.run(5);
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
}
@Test
public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException {
final String sendingHost = "localhost";
final Integer sendingPort = 21001;
runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
// bind to the same sending port that processor has for Sending Host Port
final DatagramSocket socket = new DatagramSocket(sendingPort);
final List<String> messages = getMessages(6);
final int expectedQueued = messages.size();
final int expectedTransferred = messages.size();
run(socket, messages, expectedQueued, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
verifyFlowFiles(mockFlowFiles);
verifyProvenance(expectedTransferred);
}
@Test
public void testWithSendingHostAndPortDifferentThanSender() throws IOException, InterruptedException {
final String sendingHost = "localhost";
final Integer sendingPort = 21001;
runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
// bind to a different sending port than the processor has for Sending Host Port
final DatagramSocket socket = new DatagramSocket(21002);
// no messages should come through since we are listening for 21001 and sending from 21002
final List<String> messages = getMessages(6);
final int expectedQueued = 0;
final int expectedTransferred = 0;
run(socket, messages, expectedQueued, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
}
private List<String> getMessages(int numMessages) {
final List<String> messages = new ArrayList<>();
for (int i=0; i < numMessages; i++) {
messages.add("This is message " + (i + 1));
}
return messages;
}
private void verifyFlowFiles(List<MockFlowFile> mockFlowFiles) {
for (int i = 0; i < mockFlowFiles.size(); i++) {
MockFlowFile flowFile = mockFlowFiles.get(i);
flowFile.assertContentEquals("This is message " + (i + 1));
Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenUDP.UDP_PORT_ATTR));
Assert.assertTrue(StringUtils.isNotEmpty(flowFile.getAttribute(ListenUDP.UDP_SENDER_ATTR)));
}
}
private void verifyProvenance(int expectedNumEvents) {
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
Assert.assertEquals(expectedNumEvents, provEvents.size());
for (ProvenanceEventRecord event : provEvents) {
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue(event.getTransitUri().startsWith("udp://"));
}
}
protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
throws IOException, InterruptedException {
try {
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
Thread.sleep(100);
// get the real port the dispatcher is listening on
final int destPort = proc.getDispatcherPort();
final InetSocketAddress destination = new InetSocketAddress("localhost", destPort);
// send the messages to the port the processors is listening on
for (final String message : messages) {
final byte[] buffer = message.getBytes(StandardCharsets.UTF_8);
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination);
socket.send(packet);
Thread.sleep(10);
}
long responseTimeout = 10000;
// this first loop waits until the internal queue of the processor has the expected
// number of messages ready before proceeding, we want to guarantee they are all there
// before onTrigger gets a chance to run
long startTimeQueueSizeCheck = System.currentTimeMillis();
while (proc.getQueueSize() < expectedQueueSize
&& (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
Thread.sleep(100);
}
// want to fail here if the queue size isn't what we expect
Assert.assertEquals(expectedQueueSize, proc.getQueueSize());
// call onTrigger until we processed all the messages, or a certain amount of time passes
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS).size();
Thread.sleep(100);
}
// should have transferred the expected events
runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred);
} finally {
// unschedule to close connections
proc.onUnscheduled();
IOUtils.closeQuietly(socket);
}
}
// Extend ListenUDP to mock the ChannelDispatcher and allow us to return staged events
private static class MockListenUDP extends ListenUDP {
private List<StandardEvent> mockEvents;
public MockListenUDP(List<StandardEvent> mockEvents) {
this.mockEvents = mockEvents;
}
@OnScheduled
@Override
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
events.addAll(mockEvents);
}
@Override
public void run() {
final byte[] buffer = new byte[128];
try {
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000));
final long startTime = System.nanoTime();
for (int i = 0; i < numpackets; i++) { // 100 MB
socket.send(packet);
if (throttle > 0) {
try {
Thread.sleep(throttle);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
final long endTime = System.nanoTime();
final long durationMillis = (endTime - startTime) / 1000000;
LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis);
} catch (IOException e) {
LOGGER.error("", e);
} finally {
if (closeSocket) {
socket.close();
LOGGER.info("Socket closed");
}
}
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
return Mockito.mock(ChannelDispatcher.class);
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
@ -45,6 +46,7 @@ public class TestRELPFrameHandler {
private BlockingQueue<RELPEvent> events;
private SelectionKey key;
private AsyncChannelDispatcher dispatcher;
private ProcessorLog logger;
private RELPFrameHandler<RELPEvent> frameHandler;
@ -55,8 +57,9 @@ public class TestRELPFrameHandler {
this.events = new LinkedBlockingQueue<>();
this.key = Mockito.mock(SelectionKey.class);
this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
this.logger = Mockito.mock(ProcessorLog.class);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Test

View File

@ -127,7 +127,7 @@ public class TestRELPSocketChannelHandler {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(0, 4096);
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);