NIFI-1420 Adding Splunk bundle containing PutSplunk, and GetSplunk, and adding a ListenTCP processor to standard processors. Refactored internal code from PutSyslog to create a generic AbstractPutEventProcessor which PutSplunk extends from.

This closes #233
This commit is contained in:
Bryan Bende 2016-03-07 18:07:06 -05:00
parent 4ce7b679e1
commit 6f5fb59479
33 changed files with 3710 additions and 241 deletions

View File

@ -275,6 +275,11 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>

View File

@ -220,6 +220,10 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
return errorEvents.size();
}
public int getQueueSize() {
return events == null ? 0 : events.size();
}
@OnUnscheduled
public void onUnscheduled() {
if (dispatcher != null) {

View File

@ -22,6 +22,7 @@ import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.SslContextFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@ -56,6 +57,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
private final ProcessorLog logger;
private final int maxConnections;
private final SSLContext sslContext;
private final SslContextFactory.ClientAuth clientAuth;
private final Charset charset;
private ExecutorService executor;
@ -64,7 +66,6 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
private final BlockingQueue<SelectionKey> keyQueue;
private final AtomicInteger currentConnections = new AtomicInteger(0);
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final BlockingQueue<ByteBuffer> bufferPool,
@ -73,6 +74,18 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
final int maxConnections,
final SSLContext sslContext,
final Charset charset) {
this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset);
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<E> events,
final ProcessorLog logger,
final int maxConnections,
final SSLContext sslContext,
final SslContextFactory.ClientAuth clientAuth,
final Charset charset) {
this.eventFactory = eventFactory;
this.handlerFactory = handlerFactory;
this.bufferPool = bufferPool;
@ -81,6 +94,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
this.maxConnections = maxConnections;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.sslContext = sslContext;
this.clientAuth = clientAuth;
this.charset = charset;
if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) {
@ -152,7 +166,22 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
SSLSocketChannel sslSocketChannel = null;
if (sslContext != null) {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel, false);
sslEngine.setUseClientMode(false);
switch (clientAuth) {
case REQUIRED:
sslEngine.setNeedClientAuth(true);
break;
case WANT:
sslEngine.setWantClientAuth(true);
break;
case NONE:
sslEngine.setNeedClientAuth(false);
sslEngine.setWantClientAuth(false);
break;
}
sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
}
// Attach the buffer and SSLSocketChannel to the key

View File

@ -129,17 +129,20 @@ public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends Soc
// go through the buffer looking for the end of each message
for (int i = 0; i < bytesRead; i++) {
final byte currByte = buffer[i];
currBytes.write(currByte);
// check if at end of a message
if (currByte == getDelimiter()) {
final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
if (currBytes.size() > 0) {
final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
// queue the raw event blocking until space is available, reset the temporary buffer
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.put(event);
currBytes.reset();
// queue the raw event blocking until space is available, reset the temporary buffer
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.put(event);
currBytes.reset();
}
} else {
currBytes.write(currByte);
}
}
}

View File

@ -131,20 +131,23 @@ public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extend
// NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved
// Pull data out of buffer and cram into byte array
byte currByte = socketBuffer.get();
currBytes.write(currByte);
// check if at end of a message
if (currByte == getDelimiter()) {
final SocketChannelResponder response = new SocketChannelResponder(socketChannel);
final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
if (currBytes.size() > 0) {
final SocketChannelResponder response = new SocketChannelResponder(socketChannel);
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.put(event);
currBytes.reset();
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.put(event);
currBytes.reset();
// Mark this as the start of the next message
socketBuffer.mark();
// Mark this as the start of the next message
socketBuffer.mark();
}
} else {
currBytes.write(currByte);
}
}
}

View File

@ -0,0 +1,476 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.put;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* A base class for processors that send data to an external system using TCP or UDP.
*/
public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the destination.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port on the destination.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the data being sent.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
.required(false)
.defaultValue("10 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection.")
.required(true)
.defaultValue("5 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
// Putting these properties here so sub-classes don't have to redefine them, but they are
// not added to the properties by default since not all processors may need them
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
.Builder().name("Protocol")
.description("The protocol for communication.")
.required(true)
.allowableValues(TCP_VALUE, UDP_VALUE)
.defaultValue(UDP_VALUE.getValue())
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
+ "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+ "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
+ "relationship.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are sent out this relationship.")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
protected volatile String transitUri;
protected volatile BlockingQueue<ChannelSender> senderPool;
protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(HOSTNAME);
descriptors.add(PORT);
descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
descriptors.add(CHARSET);
descriptors.add(TIMEOUT);
descriptors.add(IDLE_EXPIRATION);
descriptors.addAll(getAdditionalProperties());
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.addAll(getAdditionalRelationships());
this.relationships = Collections.unmodifiableSet(relationships);
}
/**
* Override to provide additional relationships for the processor.
*
* @return a list of relationships
*/
protected List<Relationship> getAdditionalRelationships() {
return Collections.EMPTY_LIST;
}
/**
* Override to provide additional properties for the processor.
*
* @return a list of properties
*/
protected List<PropertyDescriptor> getAdditionalProperties() {
return Collections.EMPTY_LIST;
}
@Override
public final Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
// initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
this.transitUri = createTransitUri(context);
}
@OnStopped
public void closeSenders() {
if (senderPool != null) {
ChannelSender sender = senderPool.poll();
while (sender != null) {
sender.close();
sender = senderPool.poll();
}
}
}
/**
* Sub-classes construct a transit uri for provenance events. Called from @OnScheduled
* method of this class.
*
* @param context the current context
*
* @return the transit uri
*/
protected abstract String createTransitUri(final ProcessContext context);
/**
* Sub-classes create a ChannelSender given a context.
*
* @param context the current context
* @return an implementation of ChannelSender
* @throws IOException if an error occurs creating the ChannelSender
*/
protected abstract ChannelSender createSender(final ProcessContext context) throws IOException;
/**
* Close any senders that haven't been active with in the given threshold
*
* @param idleThreshold the threshold to consider a sender as idle
*/
protected void pruneIdleSenders(final long idleThreshold) {
long currentTime = System.currentTimeMillis();
final List<ChannelSender> putBack = new ArrayList<>();
// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
if (currentTime > (sender.getLastUsed() + idleThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
} else {
putBack.add(sender);
}
}
// re-queue senders that weren't idle, but if the queue is full then close the sender
for (ChannelSender putBackSender : putBack) {
boolean returned = senderPool.offer(putBackSender);
if (!returned) {
putBackSender.close();
}
}
}
/**
* Helper for sub-classes to create a sender.
*
* @param protocol the protocol for the sender
* @param host the host to send to
* @param port the port to send to
* @param timeout the timeout for connecting and communicating over the channel
* @param maxSendBufferSize the maximum size of the socket send buffer
* @param sslContext an SSLContext, or null if not using SSL
*
* @return a ChannelSender based on the given properties
*
* @throws IOException if an error occurs creating the sender
*/
protected ChannelSender createSender(final String protocol,
final String host,
final int port,
final int timeout,
final int maxSendBufferSize,
final SSLContext sslContext) throws IOException {
ChannelSender sender;
if (protocol.equals(UDP_VALUE.getValue())) {
sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger());
} else {
// if an SSLContextService is provided then we make a secure sender
if (sslContext != null) {
sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger());
} else {
sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger());
}
}
sender.setTimeout(timeout);
sender.open();
return sender;
}
/**
* Represents a range of messages from a FlowFile.
*/
protected static class Range {
private final long start;
private final long end;
public Range(final long start, final long end) {
this.start = start;
this.end = end;
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
@Override
public String toString() {
return "Range[" + start + "-" + end + "]";
}
}
/**
* A wrapper to hold the ranges of a FlowFile that were successful and ranges that failed, and then
* transfer those ranges appropriately.
*/
protected class FlowFileMessageBatch {
private final ProcessSession session;
private final FlowFile flowFile;
private final long startTime = System.nanoTime();
private final List<Range> successfulRanges = new ArrayList<>();
private final List<Range> failedRanges = new ArrayList<>();
private Exception lastFailureReason;
private long numMessages = -1L;
private long completeTime = 0L;
private boolean canceled = false;
public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile) {
this.session = session;
this.flowFile = flowFile;
}
public synchronized void cancelOrComplete() {
if (isComplete()) {
completeSession();
return;
}
this.canceled = true;
session.rollback();
successfulRanges.clear();
failedRanges.clear();
}
public synchronized void addSuccessfulRange(final long start, final long end) {
if (canceled) {
return;
}
successfulRanges.add(new Range(start, end));
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
public synchronized void addFailedRange(final long start, final long end, final Exception e) {
if (canceled) {
return;
}
failedRanges.add(new Range(start, end));
lastFailureReason = e;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private boolean isComplete() {
return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
}
public synchronized void setNumMessages(final long msgCount) {
this.numMessages = msgCount;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private void transferRanges(final List<Range> ranges, final Relationship relationship) {
Collections.sort(ranges, new Comparator<Range>() {
@Override
public int compare(final Range o1, final Range o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i);
int count = 1;
while (i + 1 < ranges.size()) {
// Check if the next range in the List continues where this one left off.
final Range nextRange = ranges.get(i + 1);
if (nextRange.getStart() == range.getEnd()) {
// We have two ranges in a row that are contiguous; combine them into a single Range.
range = new Range(range.getStart(), nextRange.getEnd());
count++;
i++;
} else {
break;
}
}
// Create a FlowFile for this range.
FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
if (relationship == REL_SUCCESS) {
session.getProvenanceReporter().send(child, transitUri, "Sent " + count + " messages");
session.transfer(child, relationship);
} else {
child = session.penalize(child);
session.transfer(child, relationship);
}
}
}
public synchronized void completeSession() {
if (canceled) {
return;
}
if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.commit();
return;
}
if (successfulRanges.isEmpty()) {
getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
final FlowFile penalizedFlowFile = session.penalize(flowFile);
session.transfer(penalizedFlowFile, REL_FAILURE);
session.commit();
return;
}
if (failedRanges.isEmpty()) {
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
session.commit();
return;
}
// At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way
// successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
// and the failed messages to 'failure'.
transferRanges(successfulRanges, REL_SUCCESS);
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}",
new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
session.commit();
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.put.sender;
import org.apache.nifi.logging.ProcessorLog;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* Base class for sending messages over a channel.
*/
public abstract class ChannelSender {
protected final int port;
protected final String host;
protected final int maxSendBufferSize;
protected final ProcessorLog logger;
protected volatile int timeout = 10000;
protected volatile long lastUsed;
public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ProcessorLog logger) {
this.port = port;
this.host = host;
this.maxSendBufferSize = maxSendBufferSize;
this.logger = logger;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return timeout;
}
/**
* @return the last time data was sent over this channel
*/
public long getLastUsed() {
return lastUsed;
}
/**
* Opens the connection to the destination.
*
* @throws IOException if an error occurred opening the connection.
*/
public abstract void open() throws IOException;
/**
* Sends the given string over the channel.
*
* @param message the message to send over the channel
* @throws IOException if there was an error communicating over the channel
*/
public void send(final String message, final Charset charset) throws IOException {
final byte[] bytes = message.getBytes(charset);
send(bytes);
}
/**
* Sends the given data over the channel.
*
* @param data the data to send over the channel
* @throws IOException if there was an error communicating over the channel
*/
public void send(final byte[] data) throws IOException {
write(data);
lastUsed = System.currentTimeMillis();
}
/**
* Write the given buffer to the underlying channel.
*/
protected abstract void write(byte[] data) throws IOException;
/**
* @return true if the underlying channel is connected
*/
public abstract boolean isConnected();
/**
* Close the underlying channel
*/
public abstract void close();
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.put.sender;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ProcessorLog;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
/**
* Sends messages over a DatagramChannel.
*/
public class DatagramChannelSender extends ChannelSender {
private DatagramChannel channel;
public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ProcessorLog logger) {
super(host, port, maxSendBufferSize, logger);
}
@Override
public void open() throws IOException {
if (channel == null) {
channel = DatagramChannel.open();
if (maxSendBufferSize > 0) {
channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
if (actualSendBufSize < maxSendBufferSize) {
logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
+ " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
+ "consider changing the Operating System's maximum receive buffer");
}
}
}
if (!channel.isConnected()) {
channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
}
}
@Override
protected void write(byte[] data) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(data);
while (buffer.hasRemaining()) {
channel.write(buffer);
}
}
@Override
public boolean isConnected() {
return channel != null && channel.isConnected();
}
@Override
public void close() {
IOUtils.closeQuietly(channel);
channel = null;
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.put.sender;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import javax.net.ssl.SSLContext;
import java.io.IOException;
/**
* Sends messages over an SSLSocketChannel.
*/
public class SSLSocketChannelSender extends SocketChannelSender {
private SSLSocketChannel sslChannel;
private SSLContext sslContext;
public SSLSocketChannelSender(final String host,
final int port,
final int maxSendBufferSize,
final SSLContext sslContext,
final ProcessorLog logger) {
super(host, port, maxSendBufferSize, logger);
this.sslContext = sslContext;
}
@Override
public void open() throws IOException {
if (sslChannel == null) {
super.open();
sslChannel = new SSLSocketChannel(sslContext, channel, true);
}
sslChannel.setTimeout(timeout);
// SSLSocketChannel will check if already connected so we can safely call this
sslChannel.connect();
}
@Override
protected void write(byte[] data) throws IOException {
sslChannel.write(data);
}
@Override
public boolean isConnected() {
return sslChannel != null && !sslChannel.isClosed();
}
@Override
public void close() {
super.close();
IOUtils.closeQuietly(sslChannel);
sslChannel = null;
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/package org.apache.nifi.processor.util.put.sender;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
/**
* Sends messages over a SocketChannel.
*/
public class SocketChannelSender extends ChannelSender {
protected SocketChannel channel;
protected SocketChannelOutputStream socketChannelOutput;
public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ProcessorLog logger) {
super(host, port, maxSendBufferSize, logger);
}
@Override
public void open() throws IOException {
if (channel == null) {
channel = SocketChannel.open();
channel.configureBlocking(false);
if (maxSendBufferSize > 0) {
channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
if (actualSendBufSize < maxSendBufferSize) {
logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
+ " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
+ "consider changing the Operating System's maximum receive buffer");
}
}
}
if (!channel.isConnected()) {
final long startTime = System.currentTimeMillis();
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
if (!channel.connect(socketAddress)) {
while (!channel.finishConnect()) {
if (System.currentTimeMillis() > startTime + timeout) {
throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
}
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
}
}
}
socketChannelOutput = new SocketChannelOutputStream(channel);
socketChannelOutput.setTimeout(timeout);
}
}
@Override
protected void write(byte[] data) throws IOException {
socketChannelOutput.write(data);
}
@Override
public boolean isConnected() {
return channel != null && channel.isConnected();
}
@Override
public void close() {
IOUtils.closeQuietly(socketChannelOutput);
IOUtils.closeQuietly(channel);
socketChannelOutput = null;
channel = null;
}
}

View File

@ -85,10 +85,6 @@ public class SSLSocketChannel implements Closeable {
}
public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
this(sslContext.createSSLEngine(), socketChannel, client);
}
public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel, final boolean client) throws IOException {
if (!socketChannel.isConnected()) {
throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
}
@ -100,7 +96,7 @@ public class SSLSocketChannel implements Closeable {
this.hostname = socket.getInetAddress().getHostName();
this.port = socket.getPort();
this.engine = sslEngine;
this.engine = sslContext.createSSLEngine();
this.engine.setUseClientMode(client);
this.engine.setNeedClientAuth(true);
@ -109,6 +105,26 @@ public class SSLSocketChannel implements Closeable {
appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
}
public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel) throws IOException {
if (!socketChannel.isConnected()) {
throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
}
this.channel = socketChannel;
this.socketAddress = socketChannel.getRemoteAddress();
final Socket socket = socketChannel.socket();
this.hostname = socket.getInetAddress().getHostName();
this.port = socket.getPort();
// don't set useClientMode or needClientAuth, use the engine as is and let the caller configure it
this.engine = sslEngine;
streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
}
public void setTimeout(final int millis) {
this.timeoutMillis = millis;
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-bundle</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-splunk-nar</artifactId>
<version>0.6.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-processors</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,203 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,24 @@
nifi-splunk-nar
Copyright 2015-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2012 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())

View File

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-bundle</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-splunk-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.splunk</groupId>
<artifactId>splunk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,543 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.splunk.JobExportArgs;
import com.splunk.SSLSecurityProtocol;
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"get", "splunk", "logs"})
@CapabilityDescription("Retrieves data from Splunk Enterprise.")
@WritesAttributes({
@WritesAttribute(attribute="splunk.query", description = "The query that performed to produce the FlowFile."),
@WritesAttribute(attribute="splunk.earliest.time", description = "The value of the earliest time that was used when performing the query."),
@WritesAttribute(attribute="splunk.latest.time", description = "The value of the latest time that was used when performing the query.")
})
@Stateful(scopes = Scope.CLUSTER, description = "If using one of the managed Time Range Strategies, this processor will " +
"store the values of the latest and earliest times from the previous execution so that the next execution of the " +
"can pick up where the last execution left off. The state will be cleared and start over if the query is changed.")
public class GetSplunk extends AbstractProcessor {
public static final String HTTP_SCHEME = "http";
public static final String HTTPS_SCHEME = "https";
public static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
.name("Scheme")
.description("The scheme for connecting to Splunk.")
.allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
.defaultValue(HTTPS_SCHEME)
.required(true)
.build();
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the Splunk server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port of the Splunk server.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.defaultValue("8089")
.build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query")
.description("The query to execute. Typically beginning with a <search> command followed by a search clause, " +
"such as <search source=\"tcp:7689\"> to search for messages received on TCP port 7689.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("search * | head 100")
.required(true)
.build();
public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning",
"The processor will manage the date ranges of the query starting from the beginning of time.");
public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current",
"The processor will manage the date ranges of the query starting from the current time.");
public static final AllowableValue PROVIDED_VALUE = new AllowableValue("Provided", "Provided",
"The the time range provided through the Earliest Time and Latest Time properties will be used.");
public static final PropertyDescriptor TIME_RANGE_STRATEGY = new PropertyDescriptor.Builder()
.name("Time Range Strategy")
.description("Indicates how to apply time ranges to each execution of the query. Selecting a managed option " +
"allows the processor to apply a time range from the last execution time to the current execution time. " +
"When using <Managed from Beginning>, an earliest time will not be applied on the first execution, and thus all " +
"records searched. When using <Managed from Current> the earliest time of the first execution will be the " +
"initial execution time. When using <Provided>, the time range will come from the Earliest Time and Latest Time " +
"properties, or no time range will be applied if these properties are left blank.")
.allowableValues(MANAGED_BEGINNING_VALUE, MANAGED_CURRENT_VALUE, PROVIDED_VALUE)
.defaultValue(PROVIDED_VALUE.getValue())
.required(true)
.build();
public static final PropertyDescriptor EARLIEST_TIME = new PropertyDescriptor.Builder()
.name("Earliest Time")
.description("The value to use for the earliest time when querying. Only used with a Time Range Strategy of Provided. " +
"See Splunk's documentation on Search Time Modifiers for guidance in populating this field.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor LATEST_TIME = new PropertyDescriptor.Builder()
.name("Latest Time")
.description("The value to use for the latest time when querying. Only used with a Time Range Strategy of Provided. " +
"See Splunk's documentation on Search Time Modifiers for guidance in populating this field.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor APP = new PropertyDescriptor.Builder()
.name("Application")
.description("The Splunk Application to query.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.description("The owner to pass to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
.name("Token")
.description("The token to pass to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("The username to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.build();
public static final AllowableValue ATOM_VALUE = new AllowableValue(JobExportArgs.OutputMode.ATOM.name(), JobExportArgs.OutputMode.ATOM.name());
public static final AllowableValue CSV_VALUE = new AllowableValue(JobExportArgs.OutputMode.CSV.name(), JobExportArgs.OutputMode.CSV.name());
public static final AllowableValue JSON_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON.name(), JobExportArgs.OutputMode.JSON.name());
public static final AllowableValue JSON_COLS_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON_COLS.name(), JobExportArgs.OutputMode.JSON_COLS.name());
public static final AllowableValue JSON_ROWS_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON_ROWS.name(), JobExportArgs.OutputMode.JSON_ROWS.name());
public static final AllowableValue RAW_VALUE = new AllowableValue(JobExportArgs.OutputMode.RAW.name(), JobExportArgs.OutputMode.RAW.name());
public static final AllowableValue XML_VALUE = new AllowableValue(JobExportArgs.OutputMode.XML.name(), JobExportArgs.OutputMode.XML.name());
public static final PropertyDescriptor OUTPUT_MODE = new PropertyDescriptor.Builder()
.name("Output Mode")
.description("The output mode for the results.")
.allowableValues(ATOM_VALUE, CSV_VALUE, JSON_VALUE, JSON_COLS_VALUE, JSON_ROWS_VALUE, RAW_VALUE, XML_VALUE)
.defaultValue(JSON_VALUE.getValue())
.required(true)
.build();
public static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
public static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
public static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
public static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
public static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
.name("Security Protocol")
.description("The security protocol to use for communicating with Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
.defaultValue(TLS_1_2_VALUE.getValue())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Results retrieved from Splunk are sent out this relationship.")
.build();
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final String EARLIEST_TIME_KEY = "earliestTime";
public static final String LATEST_TIME_KEY = "latestTime";
public static final String QUERY_ATTR = "splunk.query";
public static final String EARLIEST_TIME_ATTR = "splunk.earliest.time";
public static final String LATEST_TIME_ATTR = "splunk.latest.time";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private volatile String transitUri;
private volatile boolean resetState = false;
private volatile Service splunkService;
protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SCHEME);
descriptors.add(HOSTNAME);
descriptors.add(PORT);
descriptors.add(QUERY);
descriptors.add(TIME_RANGE_STRATEGY);
descriptors.add(EARLIEST_TIME);
descriptors.add(LATEST_TIME);
descriptors.add(APP);
descriptors.add(OWNER);
descriptors.add(TOKEN);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(SECURITY_PROTOCOL);
descriptors.add(OUTPUT_MODE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public final Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
final String scheme = validationContext.getProperty(SCHEME).getValue();
final String secProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
if (HTTPS_SCHEME.equals(scheme) && StringUtils.isBlank(secProtocol)) {
results.add(new ValidationResult.Builder()
.explanation("Security Protocol must be specified when using HTTPS")
.valid(false).subject("Security Protocol").build());
}
final String username = validationContext.getProperty(USERNAME).getValue();
final String password = validationContext.getProperty(PASSWORD).getValue();
if (!StringUtils.isBlank(username) && StringUtils.isBlank(password)) {
results.add(new ValidationResult.Builder()
.explanation("Password must be specified when providing a Username")
.valid(false).subject("Password").build());
}
return results;
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
if ( ((oldValue != null && !oldValue.equals(newValue)) || (oldValue == null && newValue != null))
&& (descriptor.equals(QUERY)
|| descriptor.equals(TIME_RANGE_STRATEGY)
|| descriptor.equals(EARLIEST_TIME)
|| descriptor.equals(LATEST_TIME)
|| descriptor.equals(HOSTNAME))
) {
resetState = true;
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final String scheme = context.getProperty(SCHEME).getValue();
final String host = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger();
transitUri = new StringBuilder().append(scheme).append("://").append(host).append(":").append(port).toString();
// if properties changed since last execution then remove any previous state
if (resetState) {
try {
context.getStateManager().clear(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().warn("Failed to clear state", ioe);
}
resetState = false;
}
}
@OnStopped
public void onStopped() {
if (splunkService != null) {
isInitialized.set(false);
splunkService.logout();
splunkService = null;
}
}
@OnRemoved
public void onRemoved(final ProcessContext context) {
try {
context.getStateManager().clear(Scope.CLUSTER);
} catch (IOException e) {
getLogger().error("Unable to clear processor state due to {}", new Object[] {e.getMessage()}, e);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final long currentTime = System.currentTimeMillis();
synchronized (isInitialized) {
if (!isInitialized.get()) {
splunkService = createSplunkService(context);
isInitialized.set(true);
}
}
final String query = context.getProperty(QUERY).getValue();
final String outputMode = context.getProperty(OUTPUT_MODE).getValue();
final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue();
final JobExportArgs exportArgs = new JobExportArgs();
exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
exportArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode));
String earliestTime = null;
String latestTime = null;
if (PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) {
// for provided we just use the values of the properties
earliestTime = context.getProperty(EARLIEST_TIME).getValue();
latestTime = context.getProperty(LATEST_TIME).getValue();
} else {
try {
// not provided so we need to check the previous state
final TimeRange previousRange = loadState(context.getStateManager());
final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
if (previousRange == null) {
// no previous state so set the earliest time based on the strategy
if (MANAGED_CURRENT_VALUE.getValue().equals(timeRangeStrategy)) {
earliestTime = dateFormat.format(new Date(currentTime));
}
// no previous state so set the latest time to the current time
latestTime = dateFormat.format(new Date(currentTime));
// if its the first time through don't actually run, just save the state to get the
// initial time saved and next execution will be the first real execution
if (latestTime.equals(earliestTime)) {
saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime));
return;
}
} else {
// we have previous state so set earliestTime to latestTime of last range
earliestTime = previousRange.getLatestTime();
latestTime = dateFormat.format(new Date(currentTime));
}
} catch (IOException e) {
getLogger().error("Unable to load data from State Manager due to {}", new Object[] {e.getMessage()}, e);
context.yield();
return;
}
}
if (!StringUtils.isBlank(earliestTime)) {
exportArgs.setEarliestTime(earliestTime);
}
if (!StringUtils.isBlank(latestTime)) {
exportArgs.setLatestTime(latestTime);
}
getLogger().debug("Using earliestTime of {} and latestTime of {}", new Object[] {earliestTime, latestTime});
final InputStream exportSearch = splunkService.export(query, exportArgs);
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream rawOut) throws IOException {
try (BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
IOUtils.copyLarge(exportSearch, out);
}
}
});
final Map<String,String> attributes = new HashMap<>(3);
attributes.put(EARLIEST_TIME_ATTR, earliestTime);
attributes.put(LATEST_TIME_ATTR, latestTime);
attributes.put(QUERY_ATTR, query);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Received {} from Splunk", new Object[] {flowFile});
// save the time range for the next execution to pick up where we left off
// if saving fails then roll back the session so we can try again next execution
// only need to do this for the managed time strategies
if (!PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) {
try {
saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime));
} catch (IOException e) {
getLogger().error("Unable to load data from State Manager due to {}", new Object[]{e.getMessage()}, e);
session.rollback();
context.yield();
}
}
}
protected Service createSplunkService(final ProcessContext context) {
final ServiceArgs serviceArgs = new ServiceArgs();
final String scheme = context.getProperty(SCHEME).getValue();
serviceArgs.setScheme(scheme);
final String host = context.getProperty(HOSTNAME).getValue();
serviceArgs.setHost(host);
final int port = context.getProperty(PORT).asInteger();
serviceArgs.setPort(port);
final String app = context.getProperty(APP).getValue();
if (!StringUtils.isBlank(app)) {
serviceArgs.setApp(app);
}
final String owner = context.getProperty(OWNER).getValue();
if (!StringUtils.isBlank(owner)) {
serviceArgs.setOwner(owner);
}
final String token = context.getProperty(TOKEN).getValue();
if (!StringUtils.isBlank(token)) {
serviceArgs.setToken(token);
}
final String username = context.getProperty(USERNAME).getValue();
if (!StringUtils.isBlank(username)) {
serviceArgs.setUsername(username);
}
final String password = context.getProperty(PASSWORD).getValue();
if (!StringUtils.isBlank(password)) {
serviceArgs.setPassword(password);
}
final String secProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
if (!StringUtils.isBlank(secProtocol) && HTTPS_SCHEME.equals(scheme)) {
serviceArgs.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(secProtocol));
}
return Service.connect(serviceArgs);
}
private void saveState(StateManager stateManager, TimeRange timeRange) throws IOException {
final String earliest = StringUtils.isBlank(timeRange.getEarliestTime()) ? "" : timeRange.getEarliestTime();
final String latest = StringUtils.isBlank(timeRange.getLatestTime()) ? "" : timeRange.getLatestTime();
Map<String,String> state = new HashMap<>(2);
state.put(EARLIEST_TIME_KEY, earliest);
state.put(LATEST_TIME_KEY, latest);
getLogger().debug("Saving state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest});
stateManager.setState(state, Scope.CLUSTER);
}
private TimeRange loadState(StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
if (stateMap.getVersion() < 0) {
getLogger().debug("No previous state found");
return null;
}
final String earliest = stateMap.get(EARLIEST_TIME_KEY);
final String latest = stateMap.get(LATEST_TIME_KEY);
getLogger().debug("Loaded state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest});
if (StringUtils.isBlank(earliest) && StringUtils.isBlank(latest)) {
return null;
} else {
return new TimeRange(earliest, latest);
}
}
static class TimeRange {
final String earliestTime;
final String latestTime;
public TimeRange(String earliestTime, String latestTime) {
this.earliestTime = earliestTime;
this.latestTime = latestTime;
}
public String getEarliestTime() {
return earliestTime;
}
public String getLatestTime() {
return latestTime;
}
}
}

View File

@ -0,0 +1,342 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"splunk", "logs", "tcp", "udp"})
@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
@CapabilityDescription("Sends logs to Splunk Enterprise over TCP, TCP + TLS/SSL, or UDP. If a Message " +
"Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the " +
"delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of " +
"the FlowFile will be sent directly to Splunk as if it were a single message.")
public class PutSplunk extends AbstractPutEventProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be sent over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final char NEW_LINE_CHAR = '\n';
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
PROTOCOL,
MESSAGE_DELIMITER,
SSL_CONTEXT_SERVICE
);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String protocol = context.getProperty(PROTOCOL).getValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
results.add(new ValidationResult.Builder()
.explanation("SSL can not be used with UDP")
.valid(false).subject("SSL Context").build());
}
return results;
}
@OnStopped
public void cleanup() {
for (final FlowFileMessageBatch batch : activeBatches) {
batch.cancelOrComplete();
}
FlowFileMessageBatch batch;
while ((batch = completeBatches.poll()) != null) {
batch.completeSession();
}
}
@Override
protected String createTransitUri(ProcessContext context) {
final String port = context.getProperty(PORT).getValue();
final String host = context.getProperty(HOSTNAME).getValue();
final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
}
@Override
protected ChannelSender createSender(ProcessContext context) throws IOException {
final int port = context.getProperty(PORT).asInteger();
final String host = context.getProperty(HOSTNAME).getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
SSLContext sslContext = null;
if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
}
return createSender(protocol, host, port, timeout, maxSendBuffer, sslContext);
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
// first complete any batches from previous executions
FlowFileMessageBatch batch;
while ((batch = completeBatches.poll()) != null) {
batch.completeSession();
}
// create a session and try to get a FlowFile, if none available then close any idle senders
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
return;
}
// get a sender from the pool, or create a new one if the pool is empty
// if we can't create a new connection then route flow files to failure and yield
ChannelSender sender = senderPool.poll();
if (sender == null) {
try {
getLogger().debug("No available connections, creating a new one...");
sender = createSender(context);
} catch (IOException e) {
getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
new Object[]{flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
}
try {
String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if (delimiter != null) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
// if no delimiter then treat the whole FlowFile as a single message
if (delimiter == null) {
processSingleMessage(context, session, flowFile, sender);
} else {
processDelimitedMessages(context, session, flowFile, sender, delimiter);
}
} finally {
// if the connection is still open and no IO errors happened then try to return, if pool is full then close
if (sender.isConnected()) {
boolean returned = senderPool.offer(sender);
if (!returned) {
sender.close();
}
} else {
// probably already closed here, but quietly close anyway to be safe
sender.close();
}
}
}
/**
* Send the entire FlowFile as a single message.
*/
private void processSingleMessage(ProcessContext context, ProcessSession session, FlowFile flowFile, ChannelSender sender) {
// copy the contents of the FlowFile to the ByteArrayOutputStream
final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, baos);
}
});
// if TCP and we don't end in a new line then add one
final String protocol = context.getProperty(PROTOCOL).getValue();
if (protocol.equals(TCP_VALUE.getValue())) {
final byte[] buf = baos.getUnderlyingBuffer();
if (buf[baos.size() - 1] != NEW_LINE_CHAR) {
baos.write(NEW_LINE_CHAR);
}
}
// create a message batch of one message and add to active batches
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile);
messageBatch.setNumMessages(1);
activeBatches.add(messageBatch);
// attempt to send the data and add the appropriate range
try {
sender.send(baos.toByteArray());
messageBatch.addSuccessfulRange(0L, flowFile.getSize());
} catch (IOException e) {
messageBatch.addFailedRange(0L, flowFile.getSize(), e);
context.yield();
}
}
/**
* Read delimited messages from the FlowFile tracking which messages are sent successfully.
*/
private void processDelimitedMessages(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
final ChannelSender sender, final String delimiter) {
final String protocol = context.getProperty(PROTOCOL).getValue();
final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
// The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see if it matches
// some pattern. We can use this to search for the delimiter as we read through the stream of bytes in the FlowFile
final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
final LongHolder messagesSent = new LongHolder(0L);
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile);
activeBatches.add(messageBatch);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
byte[] data = null; // contents of a single message
boolean streamFinished = false;
int nextByte;
try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
long messageStartOffset = in.getBytesConsumed();
// read until we're out of data.
while (!streamFinished) {
nextByte = in.read();
if (nextByte > -1) {
baos.write(nextByte);
}
if (nextByte == -1) {
// we ran out of data. This message is complete.
data = getMessage(baos, baos.size(), protocol);
streamFinished = true;
} else if (buffer.addAndCompare((byte) nextByte)) {
// we matched our delimiter. This message is complete. We want all of the bytes from the
// underlying BAOS except for the last 'delimiterBytes.length' bytes because we don't want
// the delimiter itself to be sent.
data = getMessage(baos, baos.size() - delimiterBytes.length, protocol);
}
if (data != null) {
final long messageEndOffset = in.getBytesConsumed();
// If the message has no data, ignore it.
if (data.length != 0) {
final long rangeStart = messageStartOffset;
try {
sender.send(data);
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
messagesSent.incrementAndGet();
} catch (final IOException e) {
messageBatch.addFailedRange(rangeStart, messageEndOffset, e);
}
}
// reset BAOS so that we can start a new message.
baos.reset();
data = null;
messageStartOffset = in.getBytesConsumed();
}
}
}
}
});
messageBatch.setNumMessages(messagesSent.get());
}
}
/**
* Helper to get the bytes of a message from the ByteArrayOutputStream, factoring in whether we need a
* a new line at the end of our message.
*
* @param baos the ByteArrayOutputStream to get data from
* @param length the amount of data to copy from the baos
* @param protocol the protocol (TCP or UDP)
*
* @return the bytes from 0 to length, including a new line if the protocol was TCP
*/
private byte[] getMessage(final ByteArrayOutputStream baos, final int length, final String protocol) {
if (baos.size() == 0) {
return null;
}
final byte[] buf = baos.getUnderlyingBuffer();
// if TCP and we don't already end with a new line then add one
if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != NEW_LINE_CHAR) {
byte[] message = new byte[length + 1];
for (int i=0; i < length; i++) {
message[i] = buf[i];
}
message[message.length - 1] = NEW_LINE_CHAR;
return message;
} else {
return Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, length);
}
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.splunk.GetSplunk
org.apache.nifi.processors.splunk.PutSplunk

View File

@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.splunk.JobExportArgs;
import com.splunk.Service;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestGetSplunk {
private Service service;
private TestableGetSplunk proc;
private TestRunner runner;
@Before
public void setup() {
service = Mockito.mock(Service.class);
proc = new TestableGetSplunk(service);
runner = TestRunners.newTestRunner(proc);
}
@Test
public void testCustomValidation() {
final String query = "search tcp:7879";
final String providedEarliest = "-1h";
final String providedLatest = "now";
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
runner.setProperty(GetSplunk.QUERY, query);
runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest);
runner.setProperty(GetSplunk.LATEST_TIME, providedLatest);
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
runner.assertValid();
runner.setProperty(GetSplunk.USERNAME, "user1");
runner.assertNotValid();
runner.setProperty(GetSplunk.PASSWORD, "password");
runner.assertValid();
}
@Test
public void testGetWithProvidedTime() {
final String query = "search tcp:7879";
final String providedEarliest = "-1h";
final String providedLatest = "now";
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
runner.setProperty(GetSplunk.QUERY, query);
runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest);
runner.setProperty(GetSplunk.LATEST_TIME, providedLatest);
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
final JobExportArgs expectedArgs = new JobExportArgs();
expectedArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
expectedArgs.setEarliestTime(providedEarliest);
expectedArgs.setLatestTime(providedLatest);
expectedArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode));
final String resultContent = "fake results";
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
when(service.export(eq(query), argThat(new JobExportArgsMatcher(expectedArgs)))).thenReturn(input);
runner.run();
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetSplunk.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
mockFlowFile.assertContentEquals(resultContent);
mockFlowFile.assertAttributeEquals(GetSplunk.QUERY_ATTR, query);
mockFlowFile.assertAttributeEquals(GetSplunk.EARLIEST_TIME_ATTR, providedEarliest);
mockFlowFile.assertAttributeEquals(GetSplunk.LATEST_TIME_ATTR, providedLatest);
Assert.assertEquals(1, proc.count);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertEquals(1, events.size());
Assert.assertEquals(ProvenanceEventType.RECEIVE, events.get(0).getEventType());
Assert.assertEquals("https://localhost:8089", events.get(0).getTransitUri());
}
@Test
public void testMultipleIterationsWithoutShuttingDown() {
final String query = "search tcp:7879";
final String providedEarliest = "-1h";
final String providedLatest = "now";
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
runner.setProperty(GetSplunk.QUERY, query);
runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest);
runner.setProperty(GetSplunk.LATEST_TIME, providedLatest);
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
final JobExportArgs expectedArgs = new JobExportArgs();
expectedArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
expectedArgs.setEarliestTime(providedEarliest);
expectedArgs.setLatestTime(providedLatest);
expectedArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode));
final String resultContent = "fake results";
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
when(service.export(eq(query), argThat(new JobExportArgsMatcher(expectedArgs)))).thenReturn(input);
final int iterations = 3;
runner.run(iterations, false);
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, iterations);
Assert.assertEquals(1, proc.count);
}
@Test
public void testGetWithManagedFromBeginning() {
final String query = "search tcp:7879";
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
runner.setProperty(GetSplunk.QUERY, query);
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue());
final String resultContent = "fake results";
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
// run once and don't shut down
runner.run(1, false);
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
// capture what the args were on last run
final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class);
verify(service, times(1)).export(eq(query), capture1.capture());
// first execution with no previous state and "managed from beginning" should have a latest time and no earliest time
final JobExportArgs actualArgs1 = capture1.getValue();
Assert.assertNotNull(actualArgs1);
Assert.assertNull(actualArgs1.get("earliest_time"));
Assert.assertNotNull(actualArgs1.get("latest_time"));
// save the latest time from the first run which should be earliest time of next run
final String expectedLatest = (String) actualArgs1.get("latest_time");
// run again
runner.run(1, false);
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2);
final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class);
verify(service, times(2)).export(eq(query), capture2.capture());
// second execution the earliest time should be the previous latest_time
final JobExportArgs actualArgs2 = capture2.getValue();
Assert.assertNotNull(actualArgs2);
Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time"));
Assert.assertNotNull(actualArgs2.get("latest_time"));
}
@Test
public void testGetWithManagedFromCurrent() throws IOException {
final String query = "search tcp:7879";
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
runner.setProperty(GetSplunk.QUERY, query);
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_CURRENT_VALUE.getValue());
final String resultContent = "fake results";
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
// run once and don't shut down, shouldn't produce any results first time
runner.run(1, false);
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 0);
// capture what the args were on last run
verify(service, times(0)).export(eq(query), any(JobExportArgs.class));
final StateMap state = runner.getStateManager().getState(Scope.CLUSTER);
Assert.assertNotNull(state);
Assert.assertTrue(state.getVersion() > 0);
// save the latest time from the first run which should be earliest time of next run
final String expectedLatest = state.get(GetSplunk.LATEST_TIME_KEY);
// run again
runner.run(1, false);
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
final ArgumentCaptor<JobExportArgs> capture = ArgumentCaptor.forClass(JobExportArgs.class);
verify(service, times(1)).export(eq(query), capture.capture());
// second execution the earliest time should be the previous latest_time
final JobExportArgs actualArgs = capture.getValue();
Assert.assertNotNull(actualArgs);
Assert.assertEquals(expectedLatest, actualArgs.get("earliest_time"));
Assert.assertNotNull(actualArgs.get("latest_time"));
}
/**
* Testable implementation of GetSplunk to return a Mock Splunk Service.
*/
private static class TestableGetSplunk extends GetSplunk {
int count;
Service mockService;
public TestableGetSplunk(Service mockService) {
this.mockService = mockService;
}
@Override
protected Service createSplunkService(ProcessContext context) {
count++;
return mockService;
}
}
/**
* Custom args matcher for JobExportArgs.
*/
private static class JobExportArgsMatcher extends ArgumentMatcher<JobExportArgs> {
private JobExportArgs expected;
public JobExportArgsMatcher(JobExportArgs expected) {
this.expected = expected;
}
@Override
public boolean matches(Object o) {
if (o == null) {
return false;
}
if (!(o instanceof JobExportArgs)) {
return false;
}
JobExportArgs other = (JobExportArgs) o;
return expected.equals(other);
}
}
}

View File

@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestPutSplunk {
private TestRunner runner;
private TestablePutSplunk proc;
private CapturingChannelSender sender;
@Before
public void init() {
ProcessorLog logger = Mockito.mock(ProcessorLog.class);
sender = new CapturingChannelSender("localhost", 12345, 0, logger);
proc = new TestablePutSplunk(sender);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSplunk.PORT, "12345");
}
@Test
public void testUDPSendWholeFlowFile() {
final String message = "This is one message, should send the whole FlowFile";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(1, sender.getMessages().size());
Assert.assertEquals(message, sender.getMessages().get(0));
}
@Test
public void testTCPSendWholeFlowFile() {
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
final String message = "This is one message, should send the whole FlowFile";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(1, sender.getMessages().size());
Assert.assertEquals(message + "\n", sender.getMessages().get(0));
}
@Test
public void testTCPSendWholeFlowFileAlreadyHasNewLine() {
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
final String message = "This is one message, should send the whole FlowFile\n";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(1, sender.getMessages().size());
Assert.assertEquals(message, sender.getMessages().get(0));
}
@Test
public void testUDPSendDelimitedMessages() {
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
final String message = "This is message 1DDThis is message 2DDThis is message 3";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(3, sender.getMessages().size());
Assert.assertEquals("This is message 1", sender.getMessages().get(0));
Assert.assertEquals("This is message 2", sender.getMessages().get(1));
Assert.assertEquals("This is message 3", sender.getMessages().get(2));
}
@Test
public void testTCPSendDelimitedMessages() {
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// no delimiter at end
final String message = "This is message 1DDThis is message 2DDThis is message 3";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(3, sender.getMessages().size());
Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
}
@Test
public void testTCPSendDelimitedMessagesWithEL() {
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, "${flow.file.delim}");
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// no delimiter at end
final String message = "This is message 1DDThis is message 2DDThis is message 3";
final Map<String,String> attrs = new HashMap<>();
attrs.put("flow.file.delim", delimiter);
runner.enqueue(message, attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(3, sender.getMessages().size());
Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
}
@Test
public void testTCPSendDelimitedMessagesEndsWithDelimiter() {
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// delimiter at end
final String message = "This is message 1DDThis is message 2DDThis is message 3DD";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(3, sender.getMessages().size());
Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
}
@Test
public void testTCPSendDelimitedMessagesWithNewLineDelimiter() {
final String delimiter = "\\n";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
final String message = "This is message 1\nThis is message 2\nThis is message 3";
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(3, sender.getMessages().size());
Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
Assert.assertEquals("This is message 3\n", sender.getMessages().get(2));
}
@Test
public void testTCPSendDelimitedMessagesWithErrors() {
sender.setErrorStart(3);
sender.setErrorEnd(4);
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// no delimiter at end
final String success = "This is message 1DDThis is message 2DD";
final String failure = "This is message 3DDThis is message 4";
final String message = success + failure;
runner.enqueue(message);
runner.run(1);
runner.assertTransferCount(PutSplunk.REL_SUCCESS, 1);
runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
// first two messages should went out success
final MockFlowFile successFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
successFlowFile.assertContentEquals(success);
// second two messages should went to failure
final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
failureFlowFile.assertContentEquals(failure);
// should only have the first two messages
Assert.assertEquals(2, sender.getMessages().size());
Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
}
@Test
public void testTCPSendDelimitedMessagesWithErrorsInMiddle() {
sender.setErrorStart(3);
sender.setErrorEnd(4);
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// no delimiter at end
final String success = "This is message 1DDThis is message 2DD";
final String failure = "This is message 3DDThis is message 4DD";
final String success2 = "This is message 5DDThis is message 6DDThis is message 7DD";
final String message = success + failure + success2;
runner.enqueue(message);
runner.run(1);
runner.assertTransferCount(PutSplunk.REL_SUCCESS, 2);
runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
// first two messages should have went out success
final MockFlowFile successFlowFile1 = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
successFlowFile1.assertContentEquals(success);
// last three messages should have went out success
final MockFlowFile successFlowFile2 = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(1);
successFlowFile2.assertContentEquals(success2);
// second two messages should have went to failure
final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
failureFlowFile.assertContentEquals(failure);
// should only have the first two messages
Assert.assertEquals(5, sender.getMessages().size());
Assert.assertEquals("This is message 1\n", sender.getMessages().get(0));
Assert.assertEquals("This is message 2\n", sender.getMessages().get(1));
Assert.assertEquals("This is message 5\n", sender.getMessages().get(2));
Assert.assertEquals("This is message 6\n", sender.getMessages().get(3));
Assert.assertEquals("This is message 7\n", sender.getMessages().get(4));
}
@Test
public void testCompletingPreviousBatchOnNextExecution() {
final String message = "This is one message, should send the whole FlowFile";
runner.enqueue(message);
runner.run(2, false); // don't shutdown to prove that next onTrigger complete previous batch
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
Assert.assertEquals(1, sender.getMessages().size());
Assert.assertEquals(message, sender.getMessages().get(0));
}
/**
* Extend PutSplunk to use a CapturingChannelSender.
*/
private static class TestablePutSplunk extends PutSplunk {
private ChannelSender sender;
public TestablePutSplunk(ChannelSender channelSender) {
this.sender = channelSender;
}
@Override
protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
return sender;
}
}
/**
* A ChannelSender that captures each message that was sent.
*/
private static class CapturingChannelSender extends ChannelSender {
private List<String> messages = new ArrayList<>();
private int count = 0;
private int errorStart = -1;
private int errorEnd = -1;
public CapturingChannelSender(String host, int port, int maxSendBufferSize, ProcessorLog logger) {
super(host, port, maxSendBufferSize, logger);
}
@Override
public void open() throws IOException {
}
@Override
protected void write(byte[] data) throws IOException {
count++;
if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count <= errorEnd) {
throw new IOException("this is an error");
}
messages.add(new String(data, StandardCharsets.UTF_8));
}
@Override
public boolean isConnected() {
return false;
}
@Override
public void close() {
}
public List<String> getMessages() {
return messages;
}
public void setErrorStart(int errorStart) {
this.errorStart = errorStart;
}
public void setErrorEnd(int errorEnd) {
this.errorEnd = errorEnd;
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk.util;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
public class LogGenerator {
static final String LOG_MESSAGE = "This is log message # %s";
private final int numLogs;
private final String delimiter;
public LogGenerator(int numLogs, String delimiter) {
this.numLogs = numLogs;
this.delimiter = delimiter;
}
public void generate(final File file) throws IOException {
try (OutputStream rawOut = new FileOutputStream(file);
BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
for (int i = 0; i < numLogs; i++) {
if (i > 0) {
out.write(delimiter.getBytes(StandardCharsets.UTF_8));
}
final String message = String.format(LOG_MESSAGE, i);
out.write(message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("Done generating " + numLogs + " messages");
out.flush();
}
}
public static void main(String[] args) throws IOException {
if (args == null || args.length != 3) {
System.err.println("USAGE: LogGenerator <num_logs> <delimiter> <file>");
System.exit(1);
}
final int numLogs = Integer.parseInt(args[0]);
final String delim = args[1];
final File file = new File(args[2]);
if (file.exists()) {
file.delete();
}
final LogGenerator generator = new LogGenerator(numLogs, delim);
generator.generate(file);
}
}

View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-bundle</artifactId>
<version>0.6.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-splunk-processors</module>
<module>nifi-splunk-nar</module>
</modules>
<repositories>
<repository>
<id>splunk</id>
<name>Splunk Artifactory</name>
<url>http://splunk.artifactoryonline.com/splunk/ext-releases-local/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.splunk</groupId>
<artifactId>splunk</artifactId>
<version>1.5.0.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -49,6 +49,14 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("The timeout for connecting to and communicating with the syslog server. Does not apply to UDP")
.required(false)
.defaultValue("10 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "tcp", "tls", "ssl"})
@CapabilityDescription("Listens for incoming TCP connections and reads data from each connection using a line separator " +
"as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can " +
"be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be " +
"set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then " +
"the Receive Buffer Size must be greater than 100kb.")
@WritesAttributes({
@WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."),
@WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.")
})
public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue(SSLContextService.ClientAuth.REQUIRED.name())
.build();
// it is only the array reference that is volatile - not the contents.
private volatile byte[] messageDemarcatorBytes;
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
MAX_BATCH_SIZE,
MESSAGE_DELIMITER,
SSL_CONTEXT_SERVICE,
CLIENT_AUTH
);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
.explanation("Client Auth must be provided when using TLS/SSL")
.valid(false).subject("Client Auth").build());
}
return results;
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<TCPEvent> events)
throws IOException {
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections);
for (int i = 0; i < maxConnections; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
final EventFactory<TCPEvent> eventFactory = new TCPEventFactory();
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
SslContextFactory.ClientAuth clientAuth = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue));
clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
}
final ChannelHandlerFactory<TCPEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
// if the size is 0 then there was nothing to process so return
// we don't need to yield here because we have a long poll in side of getBatches
if (batches.size() == 0) {
return;
}
for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<TCPEvent> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
continue;
}
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
attributes.put("tcp.sender", sender);
attributes.put("tcp.port", String.valueOf(port));
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// create a provenance receive event
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
.append(port).toString();
session.getProvenanceReporter().receive(flowFile, transitUri);
}
}
/**
* Event implementation for TCP.
*/
static class TCPEvent<C extends SelectableChannel> extends StandardEvent<C> {
public TCPEvent(String sender, byte[] data, ChannelResponder<C> responder) {
super(sender, data, responder);
}
}
/**
* Factory implementation for TCPEvents.
*/
static final class TCPEventFactory implements EventFactory<TCPEvent> {
@Override
public TCPEvent create(byte[] data, Map<String, String> metadata, ChannelResponder responder) {
String sender = null;
if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) {
sender = metadata.get(EventFactory.SENDER_KEY);
}
return new TCPEvent(sender, data, responder);
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -35,19 +34,17 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
@ -81,12 +78,13 @@ public class PutSyslog extends AbstractSyslogProcessor {
.defaultValue("localhost")
.required(true)
.build();
public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Send Buffer Size")
.description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " +
"Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.")
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("2048 B")
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
@ -164,8 +162,6 @@ public class PutSyslog extends AbstractSyslogProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private volatile BlockingQueue<ByteBuffer> bufferPool;
private volatile BlockingQueue<ChannelSender> senderPool;
@Override
@ -174,9 +170,10 @@ public class PutSyslog extends AbstractSyslogProcessor {
descriptors.add(HOSTNAME);
descriptors.add(PROTOCOL);
descriptors.add(PORT);
descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(IDLE_EXPIRATION);
descriptors.add(SEND_BUFFER_SIZE);
descriptors.add(TIMEOUT);
descriptors.add(BATCH_SIZE);
descriptors.add(CHARSET);
descriptors.add(MSG_PRIORITY);
@ -221,40 +218,40 @@ public class PutSyslog extends AbstractSyslogProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
// initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
}
protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
protected ChannelSender createSender(final ProcessContext context) throws IOException {
final int port = context.getProperty(PORT).asInteger();
final String host = context.getProperty(HOSTNAME).getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
return createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool);
return createSender(sslContextService, protocol, host, port, maxSendBuffer, timeout);
}
// visible for testing to override and provide a mock sender if desired
protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, final int port,
final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host,
final int port, final int maxSendBufferSize, final int timeout)
throws IOException {
ChannelSender sender;
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelSender(host, port, bufferPool, charset);
sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger());
} else {
// if an SSLContextService is provided then we make a secure sender
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset);
sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger());
} else {
return new SocketChannelSender(host, port, bufferPool, charset);
sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger());
}
}
sender.setTimeout(timeout);
sender.open();
return sender;
}
@OnStopped
@ -275,7 +272,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
if (currentTime > (sender.lastUsed + idleThreshold)) {
if (currentTime > (sender.getLastUsed() + idleThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
} else {
@ -309,7 +306,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
if (sender == null) {
try {
getLogger().debug("No available connections, creating a new one...");
sender = createSender(context, bufferPool);
sender = createSender(context);
} catch (IOException e) {
for (final FlowFile flowFile : flowFiles) {
getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
@ -325,6 +322,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
final String host = context.getProperty(HOSTNAME).getValue();
final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
try {
for (FlowFile flowFile : flowFiles) {
@ -352,7 +350,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
messageBuilder.append('\n');
}
sender.send(messageBuilder.toString());
sender.send(messageBuilder.toString(), charSet);
timer.stop();
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
@ -396,157 +394,4 @@ public class PutSyslog extends AbstractSyslogProcessor {
return false;
}
/**
* Base class for sending messages over a channel.
*/
protected static abstract class ChannelSender {
final int port;
final String host;
final BlockingQueue<ByteBuffer> bufferPool;
final Charset charset;
volatile long lastUsed;
ChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
this.port = port;
this.host = host;
this.bufferPool = bufferPool;
this.charset = charset;
}
public void send(final String message) throws IOException {
final byte[] bytes = message.getBytes(charset);
boolean shouldReturn = true;
ByteBuffer buffer = bufferPool.poll();
if (buffer == null) {
buffer = ByteBuffer.allocate(bytes.length);
shouldReturn = false;
} else if (buffer.limit() < bytes.length) {
// we need a large buffer so return the one we got and create a new bigger one
bufferPool.offer(buffer);
buffer = ByteBuffer.allocate(bytes.length);
shouldReturn = false;
}
try {
buffer.clear();
buffer.put(bytes);
buffer.flip();
write(buffer);
lastUsed = System.currentTimeMillis();
} finally {
if (shouldReturn) {
bufferPool.offer(buffer);
}
}
}
// write the given buffer to the underlying channel
abstract void write(ByteBuffer buffer) throws IOException;
// returns true if the underlying channel is connected
abstract boolean isConnected();
// close the underlying channel
abstract void close();
}
/**
* Sends messages over a DatagramChannel.
*/
private static class DatagramChannelSender extends ChannelSender {
final DatagramChannel channel;
DatagramChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
super(host, port, bufferPool, charset);
this.channel = DatagramChannel.open();
this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
}
@Override
public void write(ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
channel.write(buffer);
}
}
@Override
boolean isConnected() {
return channel != null && channel.isConnected();
}
@Override
public void close() {
IOUtils.closeQuietly(channel);
}
}
/**
* Sends messages over a SocketChannel.
*/
private static class SocketChannelSender extends ChannelSender {
final SocketChannel channel;
SocketChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
super(host, port, bufferPool, charset);
this.channel = SocketChannel.open();
this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
}
@Override
public void write(ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
channel.write(buffer);
}
}
@Override
boolean isConnected() {
return channel != null && channel.isConnected();
}
@Override
public void close() {
IOUtils.closeQuietly(channel);
}
}
/**
* Sends messages over an SSLSocketChannel.
*/
private static class SSLSocketChannelSender extends ChannelSender {
final SSLSocketChannel channel;
SSLSocketChannelSender(final SSLContext sslContext, final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
super(host, port, bufferPool, charset);
this.channel = new SSLSocketChannel(sslContext, host, port, true);
this.channel.connect();
}
@Override
public void send(final String message) throws IOException {
final byte[] bytes = message.getBytes(charset);
channel.write(bytes);
lastUsed = System.currentTimeMillis();
}
@Override
public void write(ByteBuffer buffer) throws IOException {
// nothing to do here since we are overriding send() above
}
@Override
boolean isConnected() {
return channel != null && !channel.isClosed();
}
@Override
public void close() {
IOUtils.closeQuietly(channel);
}
}
}

View File

@ -48,6 +48,7 @@ org.apache.nifi.processors.standard.ListFile
org.apache.nifi.processors.standard.ListenHTTP
org.apache.nifi.processors.standard.ListenRELP
org.apache.nifi.processors.standard.ListenSyslog
org.apache.nifi.processors.standard.ListenTCP
org.apache.nifi.processors.standard.ListenUDP
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
* to ListenSyslog, or PutSyslog sending to a syslog server.
*/
public class TestListenAndPutSyslog {
static final Logger LOGGER = LoggerFactory.getLogger(TestListenAndPutSyslog.class);
private ListenSyslog listenSyslog;
private TestRunner listenSyslogRunner;
private PutSyslog putSyslog;
private TestRunner putSyslogRunner;
@Before
public void setup() {
this.listenSyslog = new ListenSyslog();
this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
this.putSyslog = new PutSyslog();
this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
}
@After
public void teardown() {
try {
putSyslog.onStopped();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
try {
listenSyslog.onUnscheduled();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
@Test
public void testUDP() throws IOException, InterruptedException {
run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
}
@Test
public void testTCP() throws IOException, InterruptedException {
run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
}
@Test
public void testTLS() throws InitializationException, IOException, InterruptedException {
configureSSLContextService(listenSyslogRunner);
listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
configureSSLContextService(putSyslogRunner);
putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
}
@Test
public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
configureSSLContextService(listenSyslogRunner);
listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
// send 7 but expect 0 because sender didn't use TLS
run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
}
private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException {
final SSLContextService sslContextService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService);
return sslContextService;
}
/**
* Sends numMessages from PutSyslog to ListenSyslog.
*/
private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
// set the same protocol on both processors
putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
// set a listening port of 0 to get a random available port
listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
// call onScheduled to start ListenSyslog listening
final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
final ProcessContext context = listenSyslogRunner.getProcessContext();
listenSyslog.onScheduled(context);
// get the real port it is listening on and set that in PutSyslog
final int listeningPort = listenSyslog.getPort();
putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
// configure the message properties on PutSyslog
final String pri = "34";
final String version = "1";
final String stamp = "2016-02-05T22:14:15.003Z";
final String host = "localhost";
final String body = "some message";
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
// send the messages
for (int i=0; i < numMessages; i++) {
putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
}
putSyslogRunner.run(numMessages, false);
// trigger ListenSyslog until we've seen all the messages
int numTransfered = 0;
long timeout = System.currentTimeMillis() + 30000;
while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
listenSyslog.onTrigger(context, processSessionFactory);
numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
if (expectedMessages > 0) {
// check that one of flow files has the expected content
MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(expectedMessage);
}
}
}

View File

@ -194,11 +194,24 @@ public class TestListenRELP {
// send the frames to the port the processors is listening on
sendFrames(frames, socket);
// call onTrigger until we processed all the frames, or a certain amount of time passes
long responseTimeout = 10000;
long startTime = System.currentTimeMillis();
while (proc.responses.size() < expectedTransferred
&& (System.currentTimeMillis() - startTime < responseTimeout)) {
long responseTimeout = 30000;
// this first loop waits until the internal queue of the processor has the expected
// number of messages ready before proceeding, we want to guarantee they are all there
// before onTrigger gets a chance to run
long startTimeQueueSizeCheck = System.currentTimeMillis();
while (proc.getQueueSize() < expectedResponses
&& (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
Thread.sleep(100);
}
// want to fail here if the queue size isn't what we expect
Assert.assertEquals(expectedResponses, proc.getQueueSize());
// call onTrigger until we got a respond for all the frames, or a certain amount of time passes
long startTimeProcessing = System.currentTimeMillis();
while (proc.responses.size() < expectedResponses
&& (System.currentTimeMillis() - startTimeProcessing < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
Thread.sleep(100);
}
@ -221,7 +234,6 @@ public class TestListenRELP {
for (final RELPFrame frame : frames) {
byte[] encodedFrame = encoder.encode(frame);
socket.getOutputStream().write(encodedFrame);
Thread.sleep(1);
}
socket.getOutputStream().flush();
}

View File

@ -62,7 +62,8 @@ public class TestListenSyslog {
static final String HOST = "localhost.home";
static final String BODY = "some message";
static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY ;
static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
static final String INVALID_MESSAGE = "this is not valid\n";
@Test
@ -135,7 +136,7 @@ public class TestListenSyslog {
Assert.assertTrue(port > 0);
// write some TCP messages to the port in the background
final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
sender.setDaemon(true);
sender.start();
@ -185,7 +186,7 @@ public class TestListenSyslog {
Assert.assertTrue(port > 0);
// send 3 messages as 1
final String multipleMessages = VALID_MESSAGE + "\n" + VALID_MESSAGE + "\n" + VALID_MESSAGE;
final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n";
final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
sender.setDaemon(true);
sender.start();
@ -237,7 +238,7 @@ public class TestListenSyslog {
Assert.assertTrue(port > 0);
// write some TCP messages to the port in the background
final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
sender.setDaemon(true);
sender.start();
@ -292,7 +293,7 @@ public class TestListenSyslog {
Assert.assertTrue(port > 0);
// write some UDP messages to the port in the background
final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE.replaceAll("\\n", "")));
final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
sender.setDaemon(true);
sender.start();
sender.join();
@ -432,7 +433,7 @@ public class TestListenSyslog {
private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
flowFile.assertContentEquals(VALID_MESSAGE);
flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", ""));
Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key()));

View File

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
public class TestListenTCP {
private ListenTCP proc;
private TestRunner runner;
@Before
public void setup() {
proc = new ListenTCP();
runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenTCP.PORT, "0");
}
@Test
public void testCustomValidate() throws InitializationException {
runner.setProperty(ListenTCP.PORT, "1");
runner.assertValid();
configureProcessorSslContextService();
runner.setProperty(ListenTCP.CLIENT_AUTH, "");
runner.assertNotValid();
runner.setProperty(ListenTCP.CLIENT_AUTH, SslContextFactory.ClientAuth.REQUIRED.name());
runner.assertValid();
}
@Test
public void testListenTCP() throws IOException, InterruptedException {
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
messages.add("This is message 3\n");
messages.add("This is message 4\n");
messages.add("This is message 5\n");
runTCP(messages, messages.size(), null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i=0; i < mockFlowFiles.size(); i++) {
mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
}
}
@Test
public void testListenTCPBatching() throws IOException, InterruptedException {
runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
messages.add("This is message 3\n");
messages.add("This is message 4\n");
messages.add("This is message 5\n");
runTCP(messages, 2, null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
MockFlowFile mockFlowFile1 = mockFlowFiles.get(0);
mockFlowFile1.assertContentEquals("This is message 1\nThis is message 2\nThis is message 3");
MockFlowFile mockFlowFile2 = mockFlowFiles.get(1);
mockFlowFile2.assertContentEquals("This is message 4\nThis is message 5");
}
@Test
public void testTLSClienAuthRequiredAndClientCertProvided() throws InitializationException, IOException, InterruptedException,
UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name());
configureProcessorSslContextService();
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
messages.add("This is message 3\n");
messages.add("This is message 4\n");
messages.add("This is message 5\n");
// Make an SSLContext with a key and trust store to send the test messages
final SSLContext clientSslContext = SslContextFactory.createSslContext(
"src/test/resources/localhost-ks.jks",
"localtest".toCharArray(),
"jks",
"src/test/resources/localhost-ts.jks",
"localtest".toCharArray(),
"jks",
org.apache.nifi.security.util.SslContextFactory.ClientAuth.valueOf("NONE"),
"TLS");
runTCP(messages, messages.size(), clientSslContext);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i=0; i < mockFlowFiles.size(); i++) {
mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
}
}
@Test
public void testTLSClienAuthRequiredAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException,
UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name());
configureProcessorSslContextService();
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
messages.add("This is message 3\n");
messages.add("This is message 4\n");
messages.add("This is message 5\n");
// Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
final SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
"src/test/resources/localhost-ts.jks",
"localtest".toCharArray(),
"jks",
"TLS");
try {
runTCP(messages, messages.size(), clientSslContext);
Assert.fail("Should have thrown exception");
} catch (Exception e) {
}
}
@Test
public void testTLSClienAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException,
UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name());
configureProcessorSslContextService();
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
messages.add("This is message 3\n");
messages.add("This is message 4\n");
messages.add("This is message 5\n");
// Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
final SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
"src/test/resources/localhost-ts.jks",
"localtest".toCharArray(),
"jks",
"TLS");
runTCP(messages, messages.size(), clientSslContext);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i=0; i < mockFlowFiles.size(); i++) {
mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
}
}
protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext)
throws IOException, InterruptedException {
Socket socket = null;
try {
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
// create a client connection to the port the dispatcher is listening on
final int realPort = proc.getDispatcherPort();
// create either a regular socket or ssl socket based on context being passed in
if (sslContext != null) {
socket = sslContext.getSocketFactory().createSocket("localhost", realPort);
} else {
socket = new Socket("localhost", realPort);
}
Thread.sleep(100);
// send the frames to the port the processors is listening on
for (final String message : messages) {
socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
Thread.sleep(1);
}
socket.getOutputStream().flush();
long responseTimeout = 10000;
// this first loop waits until the internal queue of the processor has the expected
// number of messages ready before proceeding, we want to guarantee they are all there
// before onTrigger gets a chance to run
long startTimeQueueSizeCheck = System.currentTimeMillis();
while (proc.getQueueSize() < messages.size()
&& (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
Thread.sleep(100);
}
// want to fail here if the queue size isn't what we expect
Assert.assertEquals(messages.size(), proc.getQueueSize());
// call onTrigger until we processed all the frames, or a certain amount of time passes
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS).size();
Thread.sleep(100);
}
// should have transferred the expected events
runner.assertTransferCount(ListenTCP.REL_SUCCESS, expectedTransferred);
} finally {
// unschedule to close connections
proc.onUnscheduled();
IOUtils.closeQuietly(socket);
}
}
private SSLContextService configureProcessorSslContextService() throws InitializationException {
final SSLContextService sslContextService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService);
runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context");
return sslContextService;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.ssl.SSLContextService;
@ -27,14 +28,11 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestPutSyslog {
@ -327,8 +325,9 @@ public class TestPutSyslog {
}
@Override
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host,
int port, int maxSendBuffer, int timeout)
throws IOException {
return mockSender;
}
}
@ -346,8 +345,9 @@ public class TestPutSyslog {
}
@Override
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host,
int port, int maxSendBuffer, int timeout)
throws IOException {
if (numSendersCreated >= numSendersAllowed) {
throw new IOException("too many senders");
}
@ -357,61 +357,70 @@ public class TestPutSyslog {
}
// Mock sender that saves any messages passed to send()
static class MockCollectingSender extends PutSyslog.ChannelSender {
static class MockCollectingSender extends ChannelSender {
List<String> messages = new ArrayList<>();
public MockCollectingSender() throws IOException {
super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8"));
this.bufferPool.offer(ByteBuffer.allocate(1024));
super("myhost", 0, 0, null);
}
@Override
public void send(String message) throws IOException {
public void open() throws IOException {
}
@Override
public void send(String message, Charset charset) throws IOException {
messages.add(message);
super.send(message);
super.send(message, charset);
}
@Override
void write(ByteBuffer buffer) throws IOException {
protected void write(byte[] buffer) throws IOException {
}
@Override
boolean isConnected() {
public boolean isConnected() {
return true;
}
@Override
void close() {
public void close() {
}
}
// Mock sender that throws IOException on calls to write() or send()
static class MockErrorSender extends PutSyslog.ChannelSender {
static class MockErrorSender extends ChannelSender {
public MockErrorSender() throws IOException {
super(null, 0, null, null);
super(null, 0, 0, null);
}
@Override
public void send(String message) throws IOException {
public void open() throws IOException {
}
@Override
public void send(String message, Charset charset) throws IOException {
throw new IOException("error");
}
@Override
void write(ByteBuffer buffer) throws IOException {
protected void write(byte[] data) throws IOException {
throw new IOException("error");
}
@Override
boolean isConnected() {
public boolean isConnected() {
return false;
}
@Override
void close() {
public void close() {
}
}

View File

@ -54,6 +54,7 @@
<module>nifi-scripting-bundle</module>
<module>nifi-elasticsearch-bundle</module>
<module>nifi-amqp-bundle</module>
<module>nifi-splunk-bundle</module>
</modules>
<dependencyManagement>
<dependencies>

View File

@ -1064,6 +1064,12 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>
<version>0.6.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-nar</artifactId>
<version>0.6.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>