mirror of https://github.com/apache/nifi.git
NIFI-274 Adding ListenSyslog and PutSyslog to standard processors.
- Refactoring connection handling on put side, removing number of buffers from properties and basing it off concurrent tasks for the processor. - Refactoring some of the TCP handling so it keeps reading from a connection until the client closes it - Adding an error queue - Adding a sender field on the syslog event to record the system that sent the message
This commit is contained in:
parent
201eac052b
commit
9c542432da
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
/**
|
||||
* Base class for Syslog processors.
|
||||
*/
|
||||
public abstract class AbstractSyslogProcessor extends AbstractProcessor {
|
||||
|
||||
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
|
||||
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
|
||||
|
||||
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
|
||||
.Builder().name("Protocol")
|
||||
.description("The protocol for Syslog communication, either TCP or UDP.")
|
||||
.required(true)
|
||||
.allowableValues(TCP_VALUE, UDP_VALUE)
|
||||
.defaultValue(UDP_VALUE.getValue())
|
||||
.build();
|
||||
public static final PropertyDescriptor PORT = new PropertyDescriptor
|
||||
.Builder().name("Port")
|
||||
.description("The port for Syslog communication.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("Specifies which character set of the Syslog messages")
|
||||
.required(true)
|
||||
.defaultValue("UTF-8")
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.build();
|
||||
|
||||
|
||||
/**
|
||||
* FlowFile Attributes for each Syslog message.
|
||||
*/
|
||||
public enum SyslogAttributes implements FlowFileAttributeKey {
|
||||
PRIORITY("syslog.priority"),
|
||||
SEVERITY("syslog.severity"),
|
||||
FACILITY("syslog.facility"),
|
||||
VERSION("syslog.version"),
|
||||
TIMESTAMP("syslog.timestamp"),
|
||||
HOSTNAME("syslog.hostname"),
|
||||
SENDER("syslog.sender"),
|
||||
BODY("syslog.body"),
|
||||
VALID("syslog.valid");
|
||||
|
||||
private String key;
|
||||
|
||||
SyslogAttributes(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String key() {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,527 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.io.nio.BufferPool;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.SyslogEvent;
|
||||
import org.apache.nifi.processors.standard.util.SyslogParser;
|
||||
import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
|
||||
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
|
||||
"expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
|
||||
"where version is optional. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
|
||||
"or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If an incoming messages matches one of these patterns, the message will be " +
|
||||
"parsed and the individual pieces will be placed in FlowFile attributes, with the original message in the content of the FlowFile. If an incoming " +
|
||||
"message does not match one of these patterns it will not be parsed and the syslog.valid attribute will be set to false with the original message " +
|
||||
"in the content of the FlowFile. Valid messages will be transferred on the success relationship, and invalid messages will be transferred on the " +
|
||||
"invalid relationship.")
|
||||
@WritesAttributes({ @WritesAttribute(attribute="syslog.priority", description="The priority of the Syslog message."),
|
||||
@WritesAttribute(attribute="syslog.severity", description="The severity of the Syslog message derived from the priority."),
|
||||
@WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."),
|
||||
@WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."),
|
||||
@WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."),
|
||||
@WritesAttribute(attribute="syslog.hostname", description="The hostname of the Syslog message."),
|
||||
@WritesAttribute(attribute="syslog.sender", description="The hostname of the Syslog server that sent the message."),
|
||||
@WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."),
|
||||
@WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
|
||||
"If this value is false, the other attributes will be empty and only the original message will be available in the content."),
|
||||
@WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
|
||||
public class ListenSyslog extends AbstractSyslogProcessor {
|
||||
|
||||
public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Receive Buffer Size")
|
||||
.description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " +
|
||||
"incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " +
|
||||
"from an incoming connection until the buffer is full, or the connection is closed. ")
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.defaultValue("65507 KB")
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Max Size of Socket Buffer")
|
||||
.description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
|
||||
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
|
||||
"the data can be read, and incoming data will be dropped.")
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.defaultValue("1 MB")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.")
|
||||
.build();
|
||||
public static final Relationship REL_INVALID = new Relationship.Builder()
|
||||
.name("invalid")
|
||||
.description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.")
|
||||
.build();
|
||||
|
||||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> descriptors;
|
||||
|
||||
private volatile BufferPool bufferPool;
|
||||
private volatile ChannelReader channelReader;
|
||||
private volatile SyslogParser parser;
|
||||
private volatile BlockingQueue<SyslogEvent> syslogEvents;
|
||||
private volatile BlockingQueue<SyslogEvent> errorEvents;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(PROTOCOL);
|
||||
descriptors.add(PORT);
|
||||
descriptors.add(RECV_BUFFER_SIZE);
|
||||
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
|
||||
descriptors.add(CHARSET);
|
||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_INVALID);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return this.relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
||||
// since properties were changed, clear any events that were queued
|
||||
if (syslogEvents != null) {
|
||||
syslogEvents.clear();
|
||||
}
|
||||
if (errorEvents != null) {
|
||||
errorEvents.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) throws IOException {
|
||||
final int port = context.getProperty(PORT).asInteger();
|
||||
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final String protocol = context.getProperty(PROTOCOL).getValue();
|
||||
final String charSet = context.getProperty(CHARSET).getValue();
|
||||
|
||||
parser = new SyslogParser(Charset.forName(charSet));
|
||||
bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
|
||||
syslogEvents = new LinkedBlockingQueue<>(10);
|
||||
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
|
||||
|
||||
// create either a UDP or TCP reader and call open() to bind to the given port
|
||||
channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
|
||||
channelReader.open(port, maxChannelBufferSize);
|
||||
|
||||
final Thread readerThread = new Thread(channelReader);
|
||||
readerThread.setName("ListenSyslog [" + getIdentifier() + "]");
|
||||
readerThread.setDaemon(true);
|
||||
readerThread.start();
|
||||
}
|
||||
|
||||
// visible for testing to be overridden and provide a mock ChannelReader if desired
|
||||
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
|
||||
throws IOException {
|
||||
if (protocol.equals(UDP_VALUE.getValue())) {
|
||||
return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
|
||||
} else {
|
||||
return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
|
||||
}
|
||||
}
|
||||
|
||||
// used for testing to access the random port that was selected
|
||||
protected int getPort() {
|
||||
return channelReader == null ? 0 : channelReader.getPort();
|
||||
}
|
||||
|
||||
@OnUnscheduled
|
||||
public void onUnscheduled() {
|
||||
if (channelReader != null) {
|
||||
channelReader.stop();
|
||||
channelReader.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
// try to pull from the error queue first, if empty then pull from main queue
|
||||
SyslogEvent initialEvent = errorEvents.poll();
|
||||
if (initialEvent == null) {
|
||||
initialEvent = syslogEvents.poll();
|
||||
}
|
||||
|
||||
// if nothing in either queue then just return
|
||||
if (initialEvent == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final SyslogEvent event = initialEvent;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
|
||||
attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
|
||||
attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
|
||||
attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
|
||||
attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
|
||||
attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
|
||||
attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
|
||||
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
|
||||
attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
|
||||
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
try {
|
||||
// write the raw bytes of the message as the FlowFile content
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write(event.getRawMessage());
|
||||
}
|
||||
});
|
||||
|
||||
if (event.isValid()) {
|
||||
getLogger().info("Transferring {} to success", new Object[]{flowFile});
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} else {
|
||||
getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
|
||||
session.transfer(flowFile, REL_INVALID);
|
||||
}
|
||||
|
||||
} catch (ProcessException e) {
|
||||
getLogger().error("Error processing Syslog message", e);
|
||||
errorEvents.offer(event);
|
||||
session.remove(flowFile);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads messages from a channel until told to stop.
|
||||
*/
|
||||
public interface ChannelReader extends Runnable {
|
||||
|
||||
void open(int port, int maxBufferSize) throws IOException;
|
||||
|
||||
int getPort();
|
||||
|
||||
void stop();
|
||||
|
||||
void close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for
|
||||
* processing, otherwise the buffer is returned to the buffer pool.
|
||||
*/
|
||||
public static class DatagramChannelReader implements ChannelReader {
|
||||
|
||||
private final BufferPool bufferPool;
|
||||
private final SyslogParser syslogParser;
|
||||
private final BlockingQueue<SyslogEvent> syslogEvents;
|
||||
private final ProcessorLog logger;
|
||||
private DatagramChannel datagramChannel;
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
|
||||
final ProcessorLog logger) {
|
||||
this.bufferPool = bufferPool;
|
||||
this.syslogParser = syslogParser;
|
||||
this.syslogEvents = syslogEvents;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(final int port, int maxBufferSize) throws IOException {
|
||||
datagramChannel = DatagramChannel.open();
|
||||
datagramChannel.configureBlocking(false);
|
||||
if (maxBufferSize > 0) {
|
||||
datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
|
||||
final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < maxBufferSize) {
|
||||
logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
datagramChannel.socket().bind(new InetSocketAddress(port));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stopped) {
|
||||
final ByteBuffer buffer = bufferPool.poll();
|
||||
try {
|
||||
if (buffer == null) {
|
||||
Thread.sleep(10L);
|
||||
logger.debug("no available buffers, continuing...");
|
||||
continue;
|
||||
}
|
||||
|
||||
final SocketAddress sender = datagramChannel.receive(buffer);
|
||||
if (sender == null) {
|
||||
Thread.sleep(1000L); // nothing to do so wait...
|
||||
} else {
|
||||
final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
|
||||
logger.trace(event.getFullMessage());
|
||||
syslogEvents.put(event); // block until space is available
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
stop();
|
||||
} catch (IOException e) {
|
||||
logger.error("Error reading from DatagramChannel", e);
|
||||
} finally {
|
||||
if (buffer != null) {
|
||||
bufferPool.returnBuffer(buffer, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopped = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(datagramChannel);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts Socket connections on the given port and creates a handler for each connection to
|
||||
* be executed by a thread pool.
|
||||
*/
|
||||
public static class SocketChannelReader implements ChannelReader {
|
||||
|
||||
private final BufferPool bufferPool;
|
||||
private final SyslogParser syslogParser;
|
||||
private final BlockingQueue<SyslogEvent> syslogEvents;
|
||||
private final ProcessorLog logger;
|
||||
private ServerSocketChannel serverSocketChannel;
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
|
||||
final ProcessorLog logger) {
|
||||
this.bufferPool = bufferPool;
|
||||
this.syslogParser = syslogParser;
|
||||
this.syslogEvents = syslogEvents;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(final int port, int maxBufferSize) throws IOException {
|
||||
serverSocketChannel = ServerSocketChannel.open();
|
||||
serverSocketChannel.configureBlocking(false);
|
||||
if (maxBufferSize > 0) {
|
||||
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
|
||||
final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < maxBufferSize) {
|
||||
logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
serverSocketChannel.socket().bind(new InetSocketAddress(port));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stopped) {
|
||||
try {
|
||||
final SocketChannel socketChannel = serverSocketChannel.accept();
|
||||
if (socketChannel == null) {
|
||||
Thread.sleep(1000L); // wait for an incoming connection...
|
||||
} else {
|
||||
final SocketChannelHandler handler = new SocketChannelHandler(
|
||||
bufferPool, socketChannel, syslogParser, syslogEvents, logger);
|
||||
logger.debug("Accepted incoming connection");
|
||||
executor.submit(handler);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Error accepting connection from SocketChannel", e);
|
||||
} catch (InterruptedException e) {
|
||||
stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopped = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(serverSocketChannel);
|
||||
executor.shutdown();
|
||||
try {
|
||||
// Wait a while for existing tasks to terminate
|
||||
if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
// (Re-)Cancel if current thread also interrupted
|
||||
executor.shutdownNow();
|
||||
// Preserve interrupt status
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for
|
||||
* processing, otherwise the buffer is returned to the buffer pool.
|
||||
*/
|
||||
public static class SocketChannelHandler implements Runnable {
|
||||
|
||||
private final BufferPool bufferPool;
|
||||
private final SocketChannel socketChannel;
|
||||
private final SyslogParser syslogParser;
|
||||
private final BlockingQueue<SyslogEvent> syslogEvents;
|
||||
private final ProcessorLog logger;
|
||||
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
|
||||
|
||||
public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
|
||||
final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
|
||||
this.bufferPool = bufferPool;
|
||||
this.socketChannel = socketChannel;
|
||||
this.syslogParser = syslogParser;
|
||||
this.syslogEvents = syslogEvents;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
int bytesRead = 0;
|
||||
while (bytesRead >= 0 && !Thread.interrupted()) {
|
||||
|
||||
final ByteBuffer buffer = bufferPool.poll();
|
||||
if (buffer == null) {
|
||||
Thread.sleep(10L);
|
||||
logger.debug("no available buffers, continuing...");
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// read until the buffer is full
|
||||
bytesRead = socketChannel.read(buffer);
|
||||
while (bytesRead > 0) {
|
||||
bytesRead = socketChannel.read(buffer);
|
||||
}
|
||||
buffer.flip();
|
||||
|
||||
// go through the buffer looking for the end of each message
|
||||
int bufferLength = buffer.limit();
|
||||
for (int i = 0; i < bufferLength; i++) {
|
||||
byte currByte = buffer.get(i);
|
||||
currBytes.write(currByte);
|
||||
|
||||
// at the end of a message so parse an event, reset the buffer, and break out of the loop
|
||||
if (currByte == '\n') {
|
||||
final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
|
||||
socketChannel.socket().getInetAddress().toString());
|
||||
logger.trace(event.getFullMessage());
|
||||
syslogEvents.put(event); // block until space is available
|
||||
currBytes.reset();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
bufferPool.returnBuffer(buffer, 0);
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("done handling SocketChannel");
|
||||
} catch (ClosedByInterruptException | InterruptedException e) {
|
||||
// nothing to do here
|
||||
} catch (IOException e) {
|
||||
logger.error("Error reading from channel", e);
|
||||
} finally {
|
||||
IOUtils.closeQuietly(socketChannel);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
|
||||
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
|
||||
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
|
||||
+ "maximum receive buffer");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,460 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.SyslogParser;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@TriggerWhenEmpty
|
||||
@Tags({"syslog", "put", "udp", "tcp", "logs"})
|
||||
@CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " +
|
||||
"which can use expression language to generate messages from incoming FlowFiles. The properties are used to construct messages of the form: " +
|
||||
"(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The constructed messages are checked against regular expressions for " +
|
||||
"RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
|
||||
"or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
|
||||
"above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " +
|
||||
"failures routed to the failure relationship.")
|
||||
public class PutSyslog extends AbstractSyslogProcessor {
|
||||
|
||||
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
||||
.name("Hostname")
|
||||
.description("The ip address or hostname of the Syslog server.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("localhost")
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Send Buffer Size")
|
||||
.description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " +
|
||||
"Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.")
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.defaultValue("2048 B")
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
|
||||
.Builder().name("Batch Size")
|
||||
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
|
||||
.required(true)
|
||||
.defaultValue("25")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
|
||||
.Builder().name("Idle Connection Expiration")
|
||||
.description("The amount of time a connection should be held open without being used before closing the connection.")
|
||||
.required(true)
|
||||
.defaultValue("5 seconds")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor MSG_PRIORITY = new PropertyDescriptor
|
||||
.Builder().name("Message Priority")
|
||||
.description("The priority for the Syslog messages, excluding < >.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor MSG_VERSION = new PropertyDescriptor
|
||||
.Builder().name("Message Version")
|
||||
.description("The version for the Syslog messages.")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor MSG_TIMESTAMP = new PropertyDescriptor
|
||||
.Builder().name("Message Timestamp")
|
||||
.description("The timestamp for the Syslog messages. The timestamp can be an RFC5424 timestamp with a format of " +
|
||||
"\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " +
|
||||
"with a format of \"MMM d HH:mm:ss\".")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor MSG_HOSTNAME = new PropertyDescriptor
|
||||
.Builder().name("Message Hostname")
|
||||
.description("The hostname for the Syslog messages.")
|
||||
.required(true)
|
||||
.defaultValue("${hostname(true)}")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor MSG_BODY = new PropertyDescriptor
|
||||
.Builder().name("Message Body")
|
||||
.description("The body for the Syslog messages.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFiles that are sent successfully to Syslog are sent out this relationship.")
|
||||
.build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFiles that failed to send to Syslog are sent out this relationship.")
|
||||
.build();
|
||||
public static final Relationship REL_INVALID = new Relationship.Builder()
|
||||
.name("invalid")
|
||||
.description("FlowFiles that do not form a valid Syslog message are sent out this relationship.")
|
||||
.build();
|
||||
|
||||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> descriptors;
|
||||
|
||||
private volatile BlockingQueue<ByteBuffer> bufferPool;
|
||||
private volatile BlockingQueue<ChannelSender> senderPool;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(HOSTNAME);
|
||||
descriptors.add(PROTOCOL);
|
||||
descriptors.add(PORT);
|
||||
descriptors.add(IDLE_EXPIRATION);
|
||||
descriptors.add(SEND_BUFFER_SIZE);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
descriptors.add(CHARSET);
|
||||
descriptors.add(MSG_PRIORITY);
|
||||
descriptors.add(MSG_VERSION);
|
||||
descriptors.add(MSG_TIMESTAMP);
|
||||
descriptors.add(MSG_HOSTNAME);
|
||||
descriptors.add(MSG_BODY);
|
||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_FAILURE);
|
||||
relationships.add(REL_INVALID);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return this.relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) throws IOException {
|
||||
final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
|
||||
for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
|
||||
this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
|
||||
}
|
||||
|
||||
// create a pool of senders based on the number of concurrent tasks for this processor
|
||||
this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
|
||||
for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
|
||||
senderPool.offer(createSender(context, bufferPool));
|
||||
}
|
||||
}
|
||||
|
||||
protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
final int port = context.getProperty(PORT).asInteger();
|
||||
final String host = context.getProperty(HOSTNAME).getValue();
|
||||
final String protocol = context.getProperty(PROTOCOL).getValue();
|
||||
final String charSet = context.getProperty(CHARSET).getValue();
|
||||
return createSender(protocol, host, port, Charset.forName(charSet), bufferPool);
|
||||
}
|
||||
|
||||
// visible for testing to override and provide a mock sender if desired
|
||||
protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
|
||||
throws IOException {
|
||||
if (protocol.equals(UDP_VALUE.getValue())) {
|
||||
return new DatagramChannelSender(host, port, bufferPool, charset);
|
||||
} else {
|
||||
return new SocketChannelSender(host, port, bufferPool, charset);
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
ChannelSender sender = senderPool.poll();
|
||||
while (sender != null) {
|
||||
sender.close();
|
||||
sender = senderPool.poll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
final String protocol = context.getProperty(PROTOCOL).getValue();
|
||||
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
||||
|
||||
final List<FlowFile> flowFiles = session.get(batchSize);
|
||||
if (flowFiles == null || flowFiles.isEmpty()) {
|
||||
final List<ChannelSender> putBack = new ArrayList<>();
|
||||
final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
|
||||
|
||||
// if a connection hasn't been used with in the threshold then it gets closed
|
||||
ChannelSender sender;
|
||||
while ((sender = senderPool.poll()) != null) {
|
||||
if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
|
||||
getLogger().debug("Closing idle connection...");
|
||||
sender.close();
|
||||
} else {
|
||||
putBack.add(sender);
|
||||
}
|
||||
}
|
||||
|
||||
// re-queue senders that weren't idle, but if the queue is full then close the sender
|
||||
for (ChannelSender putBackSender : putBack) {
|
||||
boolean returned = senderPool.offer(putBackSender);
|
||||
if (!returned) {
|
||||
putBackSender.close();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// get a sender from the pool, or create a new one if the pool is empty
|
||||
// if we can't create a new connection then route flow files to failure and yield
|
||||
ChannelSender sender = senderPool.poll();
|
||||
if (sender == null) {
|
||||
try {
|
||||
getLogger().debug("No available connections, creating a new one...");
|
||||
sender = createSender(context, bufferPool);
|
||||
} catch (IOException e) {
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
|
||||
new Object[]{flowFile}, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
|
||||
try {
|
||||
for (FlowFile flowFile : flowFiles) {
|
||||
final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
final StringBuilder messageBuilder = new StringBuilder();
|
||||
messageBuilder.append("<").append(priority).append(">");
|
||||
if (version != null) {
|
||||
messageBuilder.append(version).append(" ");
|
||||
}
|
||||
messageBuilder.append(timestamp).append(" ").append(hostname).append(" ").append(body);
|
||||
|
||||
final String fullMessage = messageBuilder.toString();
|
||||
getLogger().debug(fullMessage);
|
||||
|
||||
if (isValid(fullMessage)) {
|
||||
try {
|
||||
// now that we validated, add a new line if doing TCP
|
||||
if (protocol.equals(TCP_VALUE.getValue())) {
|
||||
messageBuilder.append('\n');
|
||||
}
|
||||
|
||||
sender.send(messageBuilder.toString());
|
||||
getLogger().info("Transferring {} to success", new Object[]{flowFile});
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} catch (IOException e) {
|
||||
getLogger().error("Transferring {} to failure", new Object[]{flowFile}, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
exceptionHolder.set(e);
|
||||
}
|
||||
} else {
|
||||
getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
|
||||
session.transfer(flowFile, REL_INVALID);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// if the connection is still open and no IO errors happened then try to return, if pool is full then close
|
||||
if (sender.isConnected() && exceptionHolder.get() == null) {
|
||||
boolean returned = senderPool.offer(sender);
|
||||
if (!returned) {
|
||||
sender.close();
|
||||
}
|
||||
} else {
|
||||
// probably already closed here, but quietly close anyway to be safe
|
||||
sender.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean isValid(final String message) {
|
||||
for (Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
|
||||
Matcher matcher = pattern.matcher(message);
|
||||
if (matcher.matches()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for sending messages over a channel.
|
||||
*/
|
||||
public static abstract class ChannelSender {
|
||||
|
||||
final int port;
|
||||
final String host;
|
||||
final BlockingQueue<ByteBuffer> bufferPool;
|
||||
final Charset charset;
|
||||
volatile long lastUsed;
|
||||
|
||||
ChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
|
||||
this.port = port;
|
||||
this.host = host;
|
||||
this.bufferPool = bufferPool;
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
public void send(final String message) throws IOException {
|
||||
final byte[] bytes = message.getBytes(charset);
|
||||
|
||||
boolean shouldReturn = true;
|
||||
ByteBuffer buffer = bufferPool.poll();
|
||||
if (buffer == null) {
|
||||
buffer = ByteBuffer.allocate(bytes.length);
|
||||
shouldReturn = false;
|
||||
} else if (buffer.limit() < bytes.length) {
|
||||
// we need a large buffer so return the one we got and create a new bigger one
|
||||
bufferPool.offer(buffer);
|
||||
buffer = ByteBuffer.allocate(bytes.length);
|
||||
shouldReturn = false;
|
||||
}
|
||||
|
||||
try {
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
buffer.flip();
|
||||
write(buffer);
|
||||
lastUsed = System.currentTimeMillis();
|
||||
} finally {
|
||||
if (shouldReturn) {
|
||||
bufferPool.offer(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// write the given buffer to the underlying channel
|
||||
abstract void write(ByteBuffer buffer) throws IOException;
|
||||
|
||||
// returns true if the underlying channel is connected
|
||||
abstract boolean isConnected();
|
||||
|
||||
// close the underlying channel
|
||||
abstract void close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends messages over a DatagramChannel.
|
||||
*/
|
||||
static class DatagramChannelSender extends ChannelSender {
|
||||
|
||||
final DatagramChannel channel;
|
||||
|
||||
DatagramChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
|
||||
super(host, port, bufferPool, charset);
|
||||
this.channel = DatagramChannel.open();
|
||||
this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer buffer) throws IOException {
|
||||
while (buffer.hasRemaining()) {
|
||||
channel.write(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isConnected() {
|
||||
return channel != null && channel.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(channel);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends messages over a SocketChannel.
|
||||
*/
|
||||
static class SocketChannelSender extends ChannelSender {
|
||||
|
||||
final SocketChannel channel;
|
||||
|
||||
SocketChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
|
||||
super(host, port, bufferPool, charset);
|
||||
this.channel = SocketChannel.open();
|
||||
this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer buffer) throws IOException {
|
||||
while (buffer.hasRemaining()) {
|
||||
channel.write(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isConnected() {
|
||||
return channel != null && channel.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(channel);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
/**
|
||||
* Encapsulates the parsed information for a single Syslog event.
|
||||
*/
|
||||
public class SyslogEvent {
|
||||
|
||||
private final String priority;
|
||||
private final String severity;
|
||||
private final String facility;
|
||||
private final String version;
|
||||
private final String timeStamp;
|
||||
private final String hostName;
|
||||
private final String sender;
|
||||
private final String msgBody;
|
||||
private final String fullMessage;
|
||||
private final byte[] rawMessage;
|
||||
private final boolean valid;
|
||||
|
||||
private SyslogEvent(final Builder builder) {
|
||||
this.priority = builder.priority;
|
||||
this.severity = builder.severity;
|
||||
this.facility = builder.facility;
|
||||
this.version = builder.version;
|
||||
this.timeStamp = builder.timeStamp;
|
||||
this.hostName = builder.hostName;
|
||||
this.sender = builder.sender;
|
||||
this.msgBody = builder.msgBody;
|
||||
this.fullMessage = builder.fullMessage;
|
||||
this.rawMessage = builder.rawMessage;
|
||||
this.valid = builder.valid;
|
||||
}
|
||||
|
||||
public String getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public String getSeverity() {
|
||||
return severity;
|
||||
}
|
||||
|
||||
public String getFacility() {
|
||||
return facility;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public String getTimeStamp() {
|
||||
return timeStamp;
|
||||
}
|
||||
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
public String getSender() {
|
||||
return sender;
|
||||
}
|
||||
|
||||
public String getMsgBody() {
|
||||
return msgBody;
|
||||
}
|
||||
|
||||
public String getFullMessage() {
|
||||
return fullMessage;
|
||||
}
|
||||
|
||||
public byte[] getRawMessage() {
|
||||
return rawMessage;
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return valid;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
private String priority;
|
||||
private String severity;
|
||||
private String facility;
|
||||
private String version;
|
||||
private String timeStamp;
|
||||
private String hostName;
|
||||
private String sender;
|
||||
private String msgBody;
|
||||
private String fullMessage;
|
||||
private byte[] rawMessage;
|
||||
private boolean valid;
|
||||
|
||||
public void reset() {
|
||||
this.priority = null;
|
||||
this.severity = null;
|
||||
this.facility = null;
|
||||
this.version = null;
|
||||
this.timeStamp = null;
|
||||
this.hostName = null;
|
||||
this.sender = null;
|
||||
this.msgBody = null;
|
||||
this.fullMessage = null;
|
||||
this.valid = false;
|
||||
}
|
||||
|
||||
public Builder priority(String priority) {
|
||||
this.priority = priority;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder severity(String severity) {
|
||||
this.severity = severity;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder facility(String facility) {
|
||||
this.facility = facility;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder version(String version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder timestamp(String timestamp) {
|
||||
this.timeStamp = timestamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hostname(String hostName) {
|
||||
this.hostName = hostName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder sender(String sender) {
|
||||
this.sender = sender;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder msgBody(String msgBody) {
|
||||
this.msgBody = msgBody;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder fullMessage(String fullMessage) {
|
||||
this.fullMessage = fullMessage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder rawMessage(byte[] rawMessage) {
|
||||
this.rawMessage = rawMessage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder valid(boolean valid) {
|
||||
this.valid = valid;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SyslogEvent build() {
|
||||
return new SyslogEvent(this);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.regex.MatchResult;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Parses a Syslog message from a ByteBuffer into a SyslogEvent instance.
|
||||
*
|
||||
* The Syslog regular expressions below were adapted from the Apache Flume project.
|
||||
*/
|
||||
public class SyslogParser {
|
||||
|
||||
public static final String SYSLOG_MSG_RFC5424_0 =
|
||||
"(?:\\<(\\d{1,3})\\>)" + // priority
|
||||
"(?:(\\d)?\\s?)" + // version
|
||||
/* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
|
||||
"(?:" +
|
||||
"(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
|
||||
"(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
|
||||
"\\s" + // separator
|
||||
"(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
|
||||
"\\s" + // separator
|
||||
"(.*)$"; // body
|
||||
|
||||
public static final String SYSLOG_MSG_RFC3164_0 =
|
||||
"(?:\\<(\\d{1,3})\\>)" +
|
||||
"(?:(\\d)?\\s?)" + // version
|
||||
// stamp MMM d HH:mm:ss, single digit date has two spaces
|
||||
"([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
|
||||
"\\s" + // separator
|
||||
"([\\w][\\w\\d\\.@-]*)" + // host
|
||||
"\\s(.*)$"; // body
|
||||
|
||||
public static final Collection<Pattern> MESSAGE_PATTERNS;
|
||||
static {
|
||||
List<Pattern> patterns = new ArrayList<>();
|
||||
patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0));
|
||||
patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0));
|
||||
MESSAGE_PATTERNS = Collections.unmodifiableList(patterns);
|
||||
}
|
||||
|
||||
// capture group positions from the above message patterns
|
||||
public static final int SYSLOG_PRIORITY_POS = 1;
|
||||
public static final int SYSLOG_VERSION_POS = 2;
|
||||
public static final int SYSLOG_TIMESTAMP_POS = 3;
|
||||
public static final int SYSLOG_HOSTNAME_POS = 4;
|
||||
public static final int SYSLOG_BODY_POS = 5;
|
||||
|
||||
private Charset charset;
|
||||
|
||||
public SyslogParser(final Charset charset) {
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a SyslogEvent from a byte buffer.
|
||||
*
|
||||
* @param buffer a byte buffer containing a syslog message
|
||||
* @return a SyslogEvent parsed from the byte array
|
||||
*/
|
||||
public SyslogEvent parseEvent(final ByteBuffer buffer) {
|
||||
return parseEvent(buffer, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a SyslogEvent from a byte buffer.
|
||||
*
|
||||
* @param buffer a byte buffer containing a syslog message
|
||||
* @param sender the hostname of the syslog server that sent the message
|
||||
* @return a SyslogEvent parsed from the byte array
|
||||
*/
|
||||
public SyslogEvent parseEvent(final ByteBuffer buffer, final String sender) {
|
||||
if (buffer == null) {
|
||||
return null;
|
||||
}
|
||||
if (buffer.position() != 0) {
|
||||
buffer.flip();
|
||||
}
|
||||
byte bytes[] = new byte[buffer.limit()];
|
||||
buffer.get(bytes, 0, buffer.limit());
|
||||
return parseEvent(bytes, sender);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a SyslogEvent from a byte array.
|
||||
*
|
||||
* @param bytes a byte array containing a syslog message
|
||||
* @param sender the hostname of the syslog server that sent the message
|
||||
* @return a SyslogEvent parsed from the byte array
|
||||
*/
|
||||
public SyslogEvent parseEvent(final byte[] bytes, final String sender) {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// remove trailing new line before parsing
|
||||
int length = bytes.length;
|
||||
if (bytes[length - 1] == '\n') {
|
||||
length = length - 1;
|
||||
}
|
||||
|
||||
final String message = new String(bytes, 0, length, charset);
|
||||
|
||||
final SyslogEvent.Builder builder = new SyslogEvent.Builder()
|
||||
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
|
||||
|
||||
for (Pattern pattern : MESSAGE_PATTERNS) {
|
||||
final Matcher matcher = pattern.matcher(message);
|
||||
if (!matcher.matches()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final MatchResult res = matcher.toMatchResult();
|
||||
for (int grp = 1; grp <= res.groupCount(); grp++) {
|
||||
String value = res.group(grp);
|
||||
if (grp == SYSLOG_TIMESTAMP_POS) {
|
||||
builder.timestamp(value);
|
||||
} else if (grp == SYSLOG_HOSTNAME_POS) {
|
||||
builder.hostname(value);
|
||||
} else if (grp == SYSLOG_PRIORITY_POS) {
|
||||
int pri = Integer.parseInt(value);
|
||||
int sev = pri % 8;
|
||||
int facility = pri / 8;
|
||||
builder.priority(value);
|
||||
builder.severity(String.valueOf(sev));
|
||||
builder.facility(String.valueOf(facility));
|
||||
} else if (grp == SYSLOG_VERSION_POS) {
|
||||
builder.version(value);
|
||||
} else if (grp == SYSLOG_BODY_POS) {
|
||||
builder.msgBody(value);
|
||||
}
|
||||
}
|
||||
|
||||
builder.valid(true);
|
||||
break;
|
||||
}
|
||||
|
||||
// either invalid w/original msg, or fully parsed event
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -44,6 +44,7 @@ org.apache.nifi.processors.standard.InvokeHTTP
|
|||
org.apache.nifi.processors.standard.GetJMSQueue
|
||||
org.apache.nifi.processors.standard.GetJMSTopic
|
||||
org.apache.nifi.processors.standard.ListenHTTP
|
||||
org.apache.nifi.processors.standard.ListenSyslog
|
||||
org.apache.nifi.processors.standard.ListenUDP
|
||||
org.apache.nifi.processors.standard.ListSFTP
|
||||
org.apache.nifi.processors.standard.LogAttribute
|
||||
|
@ -58,6 +59,7 @@ org.apache.nifi.processors.standard.PutFTP
|
|||
org.apache.nifi.processors.standard.PutJMS
|
||||
org.apache.nifi.processors.standard.PutSFTP
|
||||
org.apache.nifi.processors.standard.PutSQL
|
||||
org.apache.nifi.processors.standard.PutSyslog
|
||||
org.apache.nifi.processors.standard.ReplaceText
|
||||
org.apache.nifi.processors.standard.ReplaceTextWithMapping
|
||||
org.apache.nifi.processors.standard.RouteOnAttribute
|
||||
|
|
|
@ -0,0 +1,426 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.io.nio.BufferPool;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.standard.util.SyslogEvent;
|
||||
import org.apache.nifi.processors.standard.util.SyslogParser;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
public class TestListenSyslog {
|
||||
|
||||
static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class);
|
||||
|
||||
static final String PRI = "34";
|
||||
static final String SEV = "2";
|
||||
static final String FAC = "4";
|
||||
static final String TIME = "Oct 13 15:43:23";
|
||||
static final String HOST = "localhost.home";
|
||||
static final String BODY = "some message";
|
||||
|
||||
static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
|
||||
static final String INVALID_MESSAGE = "this is not valid\n";
|
||||
|
||||
@Test
|
||||
public void testUDP() throws IOException, InterruptedException {
|
||||
final ListenSyslog proc = new ListenSyslog();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
|
||||
runner.setProperty(ListenSyslog.PORT, "0");
|
||||
|
||||
// schedule to start listening on a random port
|
||||
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
|
||||
final ProcessContext context = runner.getProcessContext();
|
||||
proc.onScheduled(context);
|
||||
|
||||
final int numMessages = 20;
|
||||
final int port = proc.getPort();
|
||||
Assert.assertTrue(port > 0);
|
||||
|
||||
// write some UDP messages to the port in the background
|
||||
final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
|
||||
sender.setDaemon(true);
|
||||
sender.start();
|
||||
|
||||
// call onTrigger until we read all datagrams, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile);
|
||||
|
||||
} finally {
|
||||
// unschedule to close connections
|
||||
proc.onUnscheduled();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTCPSingleConnection() throws IOException, InterruptedException {
|
||||
final ListenSyslog proc = new ListenSyslog();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
|
||||
runner.setProperty(ListenSyslog.PORT, "0");
|
||||
|
||||
// schedule to start listening on a random port
|
||||
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
|
||||
final ProcessContext context = runner.getProcessContext();
|
||||
proc.onScheduled(context);
|
||||
|
||||
final int numMessages = 20;
|
||||
final int port = proc.getPort();
|
||||
Assert.assertTrue(port > 0);
|
||||
|
||||
// write some TCP messages to the port in the background
|
||||
final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
|
||||
sender.setDaemon(true);
|
||||
sender.start();
|
||||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile);
|
||||
} finally {
|
||||
// unschedule to close connections
|
||||
proc.onUnscheduled();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTCPMultipleConnection() throws IOException, InterruptedException {
|
||||
final ListenSyslog proc = new ListenSyslog();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
|
||||
runner.setProperty(ListenSyslog.PORT, "0");
|
||||
|
||||
// schedule to start listening on a random port
|
||||
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
|
||||
final ProcessContext context = runner.getProcessContext();
|
||||
proc.onScheduled(context);
|
||||
|
||||
final int numMessages = 20;
|
||||
final int port = proc.getPort();
|
||||
Assert.assertTrue(port > 0);
|
||||
|
||||
// write some TCP messages to the port in the background
|
||||
final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
|
||||
sender.setDaemon(true);
|
||||
sender.start();
|
||||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile);
|
||||
} finally {
|
||||
// unschedule to close connections
|
||||
proc.onUnscheduled();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalid() throws IOException, InterruptedException {
|
||||
final ListenSyslog proc = new ListenSyslog();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
|
||||
runner.setProperty(ListenSyslog.PORT, "0");
|
||||
|
||||
// schedule to start listening on a random port
|
||||
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
|
||||
final ProcessContext context = runner.getProcessContext();
|
||||
proc.onScheduled(context);
|
||||
|
||||
final int numMessages = 10;
|
||||
final int port = proc.getPort();
|
||||
Assert.assertTrue(port > 0);
|
||||
|
||||
// write some TCP messages to the port in the background
|
||||
final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE));
|
||||
sender.setDaemon(true);
|
||||
sender.start();
|
||||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(50);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
|
||||
} finally {
|
||||
// unschedule to close connections
|
||||
proc.onUnscheduled();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorQueue() {
|
||||
final SyslogEvent event1 = Mockito.mock(SyslogEvent.class);
|
||||
Mockito.when(event1.getRawMessage()).thenThrow(new ProcessException("ERROR"));
|
||||
|
||||
final SyslogEvent event2 = new SyslogEvent.Builder()
|
||||
.facility("fac").severity("sev")
|
||||
.fullMessage("abc").hostname("host")
|
||||
.msgBody("body").timestamp("123").valid(true)
|
||||
.rawMessage("abc".getBytes(Charset.forName("UTF-8")))
|
||||
.build();
|
||||
|
||||
final MockProcessor proc = new MockProcessor(Arrays.asList(event1, event2));
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ListenSyslog.PORT, "12345");
|
||||
|
||||
// should keep re-processing event1 from the error queue
|
||||
runner.run(3);
|
||||
runner.assertTransferCount(ListenSyslog.REL_INVALID, 0);
|
||||
runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
|
||||
}
|
||||
|
||||
|
||||
private void checkFlowFile(MockFlowFile flowFile) {
|
||||
flowFile.assertContentEquals(VALID_MESSAGE);
|
||||
Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
|
||||
Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
|
||||
Assert.assertEquals(FAC, flowFile.getAttribute(ListenSyslog.SyslogAttributes.FACILITY.key()));
|
||||
Assert.assertEquals(TIME, flowFile.getAttribute(ListenSyslog.SyslogAttributes.TIMESTAMP.key()));
|
||||
Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
|
||||
Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
|
||||
Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a given number of datagrams to the given port.
|
||||
*/
|
||||
public static final class DatagramSender implements Runnable {
|
||||
|
||||
final int port;
|
||||
final int numMessages;
|
||||
final long delay;
|
||||
final String message;
|
||||
|
||||
public DatagramSender(int port, int numMessages, long delay, String message) {
|
||||
this.port = port;
|
||||
this.numMessages = numMessages;
|
||||
this.delay = delay;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
|
||||
try (DatagramChannel channel = DatagramChannel.open()) {
|
||||
channel.connect(new InetSocketAddress("localhost", port));
|
||||
for (int i=0; i < numMessages; i++) {
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
buffer.flip();
|
||||
|
||||
while(buffer.hasRemaining()) {
|
||||
channel.write(buffer);
|
||||
}
|
||||
|
||||
Thread.sleep(delay);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a given number of datagrams to the given port.
|
||||
*/
|
||||
public static final class SingleConnectionSocketSender implements Runnable {
|
||||
|
||||
final int port;
|
||||
final int numMessages;
|
||||
final long delay;
|
||||
final String message;
|
||||
|
||||
public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) {
|
||||
this.port = port;
|
||||
this.numMessages = numMessages;
|
||||
this.delay = delay;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
|
||||
try (SocketChannel channel = SocketChannel.open()) {
|
||||
channel.connect(new InetSocketAddress("localhost", port));
|
||||
|
||||
for (int i=0; i < numMessages; i++) {
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
buffer.flip();
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
channel.write(buffer);
|
||||
}
|
||||
Thread.sleep(delay);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a given number of datagrams to the given port.
|
||||
*/
|
||||
public static final class MultiConnectionSocketSender implements Runnable {
|
||||
|
||||
final int port;
|
||||
final int numMessages;
|
||||
final long delay;
|
||||
final String message;
|
||||
|
||||
public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) {
|
||||
this.port = port;
|
||||
this.numMessages = numMessages;
|
||||
this.delay = delay;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
|
||||
for (int i=0; i < numMessages; i++) {
|
||||
try (SocketChannel channel = SocketChannel.open()) {
|
||||
channel.connect(new InetSocketAddress("localhost", port));
|
||||
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
buffer.flip();
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
channel.write(buffer);
|
||||
}
|
||||
Thread.sleep(delay);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A mock version of ListenSyslog that will queue the provided events
|
||||
private static class MockProcessor extends ListenSyslog {
|
||||
|
||||
private List<SyslogEvent> eventList;
|
||||
|
||||
public MockProcessor(List<SyslogEvent> eventList) {
|
||||
this.eventList = eventList;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
|
||||
return new ChannelReader() {
|
||||
@Override
|
||||
public void open(int port, int maxBufferSize) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (SyslogEvent event : eventList) {
|
||||
syslogEvents.offer(event);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,398 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
public class TestPutSyslog {
|
||||
|
||||
private MockCollectingSender sender;
|
||||
private MockPutSyslog proc;
|
||||
private TestRunner runner;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
sender = new MockCollectingSender();
|
||||
proc = new MockPutSyslog(sender);
|
||||
runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(PutSyslog.HOSTNAME, "localhost");
|
||||
runner.setProperty(PutSyslog.PORT, "12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidMessageStaticPropertiesUdp() {
|
||||
final String pri = "34";
|
||||
final String version = "1";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
|
||||
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
|
||||
runner.setProperty(PutSyslog.MSG_VERSION, version);
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
|
||||
runner.setProperty(PutSyslog.MSG_BODY, body);
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
|
||||
Assert.assertEquals(1, sender.messages.size());
|
||||
Assert.assertEquals(expectedMessage, sender.messages.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidMessageStaticPropertiesTcp() {
|
||||
final String pri = "34";
|
||||
final String version = "1";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
|
||||
|
||||
runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
|
||||
runner.setProperty(PutSyslog.MSG_VERSION, version);
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
|
||||
runner.setProperty(PutSyslog.MSG_BODY, body);
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
|
||||
Assert.assertEquals(1, sender.messages.size());
|
||||
Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidMessageStaticPropertiesNoVersion() {
|
||||
final String pri = "34";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
|
||||
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
|
||||
runner.setProperty(PutSyslog.MSG_BODY, body);
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
|
||||
Assert.assertEquals(1, sender.messages.size());
|
||||
Assert.assertEquals(expectedMessage, sender.messages.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidMessageELProperties() {
|
||||
final String pri = "34";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
|
||||
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
|
||||
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put("syslog.priority", pri);
|
||||
attributes.put("syslog.timestamp", stamp);
|
||||
attributes.put("syslog.hostname", host);
|
||||
attributes.put("syslog.body", body);
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
|
||||
Assert.assertEquals(1, sender.messages.size());
|
||||
Assert.assertEquals(expectedMessage, sender.messages.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidMessageELProperties() {
|
||||
final String pri = "34";
|
||||
final String stamp = "not-a-timestamp";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
|
||||
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put("syslog.priority", pri);
|
||||
attributes.put("syslog.timestamp", stamp);
|
||||
attributes.put("syslog.hostname", host);
|
||||
attributes.put("syslog.body", body);
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1);
|
||||
Assert.assertEquals(0, sender.messages.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIOExceptionOnSend() throws IOException {
|
||||
final String pri = "34";
|
||||
final String version = "1";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
proc = new MockPutSyslog(new MockErrorSender());
|
||||
runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(PutSyslog.HOSTNAME, "localhost");
|
||||
runner.setProperty(PutSyslog.PORT, "12345");
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
|
||||
runner.setProperty(PutSyslog.MSG_VERSION, version);
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
|
||||
runner.setProperty(PutSyslog.MSG_BODY, body);
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1);
|
||||
Assert.assertEquals(0, sender.messages.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIOExceptionCreatingConnection() throws IOException {
|
||||
final String pri = "34";
|
||||
final String version = "1";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1);
|
||||
runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(PutSyslog.HOSTNAME, "localhost");
|
||||
runner.setProperty(PutSyslog.PORT, "12345");
|
||||
runner.setProperty(PutSyslog.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
|
||||
runner.setProperty(PutSyslog.MSG_VERSION, version);
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
|
||||
runner.setProperty(PutSyslog.MSG_BODY, body);
|
||||
|
||||
// the first run will throw IOException when calling send so the connection won't be re-qeued
|
||||
// the second run will try to create a new connection but throw an exception which should be caught and route files to failure
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.run(2);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2);
|
||||
Assert.assertEquals(0, sender.messages.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeMessageFailure() {
|
||||
final String pri = "34";
|
||||
final String stamp = "2015-10-15T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
|
||||
final StringBuilder bodyBuilder = new StringBuilder(4096);
|
||||
for (int i=0; i < 4096; i++) {
|
||||
bodyBuilder.append("a");
|
||||
}
|
||||
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
|
||||
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put("syslog.priority", pri);
|
||||
attributes.put("syslog.timestamp", stamp);
|
||||
attributes.put("syslog.hostname", host);
|
||||
attributes.put("syslog.body", bodyBuilder.toString());
|
||||
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
|
||||
runner.run();
|
||||
|
||||
// should have dynamically created a larger buffer
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
|
||||
Assert.assertEquals(1, sender.messages.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoIncomingData() {
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, "10");
|
||||
runner.setProperty(PutSyslog.MSG_VERSION, "1");
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z");
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost");
|
||||
runner.setProperty(PutSyslog.MSG_BODY, "test");
|
||||
|
||||
// queue one file but run several times to test no incoming data
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
|
||||
runner.run(5);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchingFlowFiles() {
|
||||
runner.setProperty(PutSyslog.BATCH_SIZE, "10");
|
||||
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
|
||||
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
|
||||
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
|
||||
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put("syslog.priority", "10");
|
||||
attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z");
|
||||
attributes.put("syslog.hostname", "my.host.name");
|
||||
attributes.put("syslog.body", "blah blah blah");
|
||||
|
||||
for (int i=0; i < 15; i++) {
|
||||
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
|
||||
}
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10);
|
||||
Assert.assertEquals(10, sender.messages.size());
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15);
|
||||
Assert.assertEquals(15, sender.messages.size());
|
||||
}
|
||||
|
||||
// Mock processor to return a MockCollectingSender
|
||||
static class MockPutSyslog extends PutSyslog {
|
||||
|
||||
ChannelSender mockSender;
|
||||
|
||||
public MockPutSyslog(ChannelSender sender) {
|
||||
this.mockSender = sender;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
return mockSender;
|
||||
}
|
||||
}
|
||||
|
||||
// Mock processor to test exception when creating new senders
|
||||
static class MockCreationErrorPutSyslog extends PutSyslog {
|
||||
|
||||
int numSendersCreated;
|
||||
int numSendersAllowed;
|
||||
ChannelSender mockSender;
|
||||
|
||||
public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) {
|
||||
this.mockSender = sender;
|
||||
this.numSendersAllowed = numSendersAllowed;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
if (numSendersCreated >= numSendersAllowed) {
|
||||
throw new IOException("too many senders");
|
||||
}
|
||||
numSendersCreated++;
|
||||
return mockSender;
|
||||
}
|
||||
}
|
||||
|
||||
// Mock sender that saves any messages passed to send()
|
||||
static class MockCollectingSender extends PutSyslog.ChannelSender {
|
||||
|
||||
List<String> messages = new ArrayList<>();
|
||||
|
||||
public MockCollectingSender() throws IOException {
|
||||
super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8"));
|
||||
this.bufferPool.offer(ByteBuffer.allocate(1024));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String message) throws IOException {
|
||||
messages.add(message);
|
||||
super.send(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
void write(ByteBuffer buffer) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isConnected() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Mock sender that throws IOException on calls to write() or send()
|
||||
static class MockErrorSender extends PutSyslog.ChannelSender {
|
||||
|
||||
public MockErrorSender() throws IOException {
|
||||
super(null, 0, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String message) throws IOException {
|
||||
throw new IOException("error");
|
||||
}
|
||||
|
||||
@Override
|
||||
void write(ByteBuffer buffer) throws IOException {
|
||||
throw new IOException("error");
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isConnected() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,253 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TestSyslogParser {
|
||||
|
||||
static final Charset CHARSET = Charset.forName("UTF-8");
|
||||
|
||||
private SyslogParser parser;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
parser = new SyslogParser(CHARSET);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRFC3164SingleDigitDay() {
|
||||
final String pri = "10";
|
||||
final String stamp = "Oct 1 13:14:04";
|
||||
final String host = "my.host.com";
|
||||
final String body = "some body message";
|
||||
final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(pri, event.getPriority());
|
||||
Assert.assertEquals("2", event.getSeverity());
|
||||
Assert.assertEquals("1", event.getFacility());
|
||||
Assert.assertNull(event.getVersion());
|
||||
Assert.assertEquals(stamp, event.getTimeStamp());
|
||||
Assert.assertEquals(host, event.getHostName());
|
||||
Assert.assertEquals(body, event.getMsgBody());
|
||||
Assert.assertEquals(message, event.getFullMessage());
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRFC3164DoubleDigitDay() {
|
||||
final String pri = "31";
|
||||
final String stamp = "Oct 13 14:14:43";
|
||||
final String host = "localhost";
|
||||
final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
|
||||
final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(pri, event.getPriority());
|
||||
Assert.assertEquals("7", event.getSeverity());
|
||||
Assert.assertEquals("3", event.getFacility());
|
||||
Assert.assertNull(event.getVersion());
|
||||
Assert.assertEquals(stamp, event.getTimeStamp());
|
||||
Assert.assertEquals(host, event.getHostName());
|
||||
Assert.assertEquals(body, event.getMsgBody());
|
||||
Assert.assertEquals(message, event.getFullMessage());
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRFC3164WithVersion() {
|
||||
final String pri = "31";
|
||||
final String version = "1";
|
||||
final String stamp = "Oct 13 14:14:43";
|
||||
final String host = "localhost";
|
||||
final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
|
||||
final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(pri, event.getPriority());
|
||||
Assert.assertEquals("7", event.getSeverity());
|
||||
Assert.assertEquals("3", event.getFacility());
|
||||
Assert.assertEquals(version, event.getVersion());
|
||||
Assert.assertEquals(stamp, event.getTimeStamp());
|
||||
Assert.assertEquals(host, event.getHostName());
|
||||
Assert.assertEquals(body, event.getMsgBody());
|
||||
Assert.assertEquals(message, event.getFullMessage());
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRFC5424WithVersion() {
|
||||
final String pri = "34";
|
||||
final String version = "1";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(pri, event.getPriority());
|
||||
Assert.assertEquals("2", event.getSeverity());
|
||||
Assert.assertEquals("4", event.getFacility());
|
||||
Assert.assertEquals(version, event.getVersion());
|
||||
Assert.assertEquals(stamp, event.getTimeStamp());
|
||||
Assert.assertEquals(host, event.getHostName());
|
||||
Assert.assertEquals(body, event.getMsgBody());
|
||||
Assert.assertEquals(message, event.getFullMessage());
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRFC5424WithoutVersion() {
|
||||
final String pri = "34";
|
||||
final String stamp = "2003-10-11T22:14:15.003Z";
|
||||
final String host = "mymachine.example.com";
|
||||
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(pri, event.getPriority());
|
||||
Assert.assertEquals("2", event.getSeverity());
|
||||
Assert.assertEquals("4", event.getFacility());
|
||||
Assert.assertNull(event.getVersion());
|
||||
Assert.assertEquals(stamp, event.getTimeStamp());
|
||||
Assert.assertEquals(host, event.getHostName());
|
||||
Assert.assertEquals(body, event.getMsgBody());
|
||||
Assert.assertEquals(message, event.getFullMessage());
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrailingNewLine() {
|
||||
final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVariety() {
|
||||
final List<String> messages = new ArrayList<>();
|
||||
|
||||
// supported examples from RFC 3164
|
||||
messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
|
||||
"lonvick on /dev/pts/8");
|
||||
messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!");
|
||||
messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
|
||||
"It's time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK # " +
|
||||
"Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
|
||||
"Conveyer1=OK, Conveyer2=OK # %%");
|
||||
messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
|
||||
"scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
|
||||
|
||||
// supported examples from RFC 5424
|
||||
messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
|
||||
"ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
|
||||
messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
|
||||
"8710 - - %% It's time to make the do-nuts.");
|
||||
|
||||
// non-standard (but common) messages (RFC3339 dates, no version digit)
|
||||
messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
|
||||
messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
|
||||
|
||||
for (final String message : messages) {
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertTrue(event.isValid());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPriority() {
|
||||
final String message = "10 Oct 13 14:14:43 localhost some body of the message";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertFalse(event.isValid());
|
||||
Assert.assertEquals(message, event.getFullMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseWithSender() {
|
||||
final String sender = "127.0.0.1";
|
||||
final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final SyslogEvent event = parser.parseEvent(buffer, sender);
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertTrue(event.isValid());
|
||||
Assert.assertEquals(sender, event.getSender());
|
||||
}
|
||||
}
|
|
@ -46,7 +46,7 @@
|
|||
<module>nifi-image-bundle</module>
|
||||
<module>nifi-avro-bundle</module>
|
||||
<module>nifi-couchbase-bundle</module>
|
||||
</modules>
|
||||
</modules>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
@ -131,4 +131,4 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
||||
</project>
|
Loading…
Reference in New Issue