NIFI-1579 Performance improvements for ListenSyslog which include removing an unnecessary yield and exposing a configurable size for the internal queue used by the processor, changing ListenSyslog to use a 20ms poll and use a long poll when batching, also including same improvements for ListenRELP

This commit is contained in:
Bryan Bende 2016-03-01 12:54:52 -05:00
parent 6776060ac8
commit 19e53962ca
4 changed files with 52 additions and 18 deletions

View File

@ -85,6 +85,15 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Message Queue")
.description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " +
"Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " +
"memory used by the processor.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.required(true)
.build();
// Putting these properties here so sub-classes don't have to redefine them, but they are
// not added to the properties by default since not all processors may need them
@ -119,7 +128,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
.description("Messages received successfully will be sent out this relationship.")
.build();
public static final int POLL_TIMEOUT_MS = 100;
public static final int POLL_TIMEOUT_MS = 20;
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@ -127,7 +136,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
protected volatile int port;
protected volatile Charset charset;
protected volatile ChannelDispatcher dispatcher;
protected volatile BlockingQueue<E> events = new LinkedBlockingQueue<>(10);
protected volatile BlockingQueue<E> events;
protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>();
@Override
@ -135,6 +144,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PORT);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
descriptors.addAll(getAdditionalProperties());
@ -178,6 +188,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
public void onScheduled(final ProcessContext context) throws IOException {
charset = Charset.forName(context.getProperty(CHARSET).getValue());
port = context.getProperty(PORT).asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
@ -230,7 +241,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
*
* @return an event from one of the queues, or null if none are available
*/
protected E getMessage(final boolean longPoll, final boolean pollErrorQueue) {
protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
E event = null;
if (pollErrorQueue) {
event = errorEvents.poll();
@ -249,6 +260,10 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
}
}
if (event != null) {
session.adjustCounter("Messages Received", 1L, false);
}
return event;
}
@ -270,7 +285,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
final Map<String,FlowFileEventBatch> batches = new HashMap<>();
for (int i=0; i < totalBatchSize; i++) {
final E event = getMessage(true, true);
final E event = getMessage(true, true, session);
if (event == null) {
break;
}
@ -311,8 +326,6 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
errorEvents.offer(event);
break;
}
session.adjustCounter("Messages Received", 1L, false);
}
return batches;

View File

@ -133,9 +133,10 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
// if the size is 0 then there was nothing to process so yield and return
// if the size is 0 then there was nothing to process so return
// we don't need to yield here because inside getBatches() we are polling a queue with a wait
// and yielding here could have a negative impact on performance
if (batches.size() == 0) {
context.yield();
return;
}
@ -170,6 +171,7 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// create a provenance receive event
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;

View File

@ -98,6 +98,15 @@ import java.util.concurrent.TimeUnit;
@SeeAlso({PutSyslog.class, ParseSyslog.class})
public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Message Queue")
.description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " +
"Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " +
"memory used by the processor.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.required(true)
.build();
public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Receive Buffer Size")
.description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " +
@ -171,7 +180,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private volatile ChannelDispatcher channelDispatcher;
private volatile SyslogParser parser;
private volatile BlockingQueue<ByteBuffer> bufferPool;
private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<>(10);
private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
private volatile BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>();
private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents.
@ -182,6 +191,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PORT);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(MAX_CONNECTIONS);
descriptors.add(MAX_BATCH_SIZE);
@ -245,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int port = context.getProperty(PORT).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
@ -263,6 +274,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
parser = new SyslogParser(Charset.forName(charSet));
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
// create either a UDP or TCP reader and call open() to bind to the given port
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@ -313,7 +325,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) {
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
RawSyslogEvent rawSyslogEvent = null;
if (pollErrorQueue) {
rawSyslogEvent = errorEvents.poll();
@ -322,7 +334,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
if (rawSyslogEvent == null) {
try {
if (longPoll) {
rawSyslogEvent = syslogEvents.poll(100, TimeUnit.MILLISECONDS);
rawSyslogEvent = syslogEvents.poll(20, TimeUnit.MILLISECONDS);
} else {
rawSyslogEvent = syslogEvents.poll();
}
@ -332,6 +344,10 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
if (rawSyslogEvent != null) {
session.adjustCounter("Messages Received", 1L, false);
}
return rawSyslogEvent;
}
@ -342,11 +358,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// poll the queue with a small timeout to avoid unnecessarily yielding below
RawSyslogEvent rawSyslogEvent = getMessage(true, true);
RawSyslogEvent rawSyslogEvent = getMessage(true, true, session);
// if nothing in the queue then yield and return
// if nothing in the queue just return, we don't want to yield here because yielding could adversely
// impact performance, and we already have a long poll in getMessage so there will be some built in
// throttling even when no data is available
if (rawSyslogEvent == null) {
context.yield();
return;
}
@ -372,7 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// If this is our first iteration, we have already polled our queues. Otherwise, poll on each iteration.
if (i > 0) {
rawSyslogEvent = getMessage(false, false);
rawSyslogEvent = getMessage(true, false, session);
if (rawSyslogEvent == null) {
break;
@ -461,7 +478,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
break;
}
session.adjustCounter("Messages Received", 1L, false);
flowFilePerSender.put(sender, flowFile);
}
@ -483,6 +499,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString();
session.getProvenanceReporter().receive(flowFile, transitUri);

View File

@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
@ -602,11 +603,11 @@ public class TestListenSyslog {
}
@Override
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) {
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
if (eventItr.hasNext()) {
return eventItr.next();
}
return super.getMessage(longPoll, pollErrorQueue);
return super.getMessage(longPoll, pollErrorQueue, session);
}
}
}