NIFI-9632 - Removed nifi-lumberjack-bundle

- Removed several unused socket classes from nifi-processor-utils

This closes #5722

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-01-27 12:54:41 -05:00 committed by exceptionfactory
parent 3ccc9d29b6
commit 8f2a9f94fe
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
37 changed files with 1 additions and 3276 deletions

View File

@ -70,7 +70,7 @@ import java.util.concurrent.LinkedBlockingQueue;
@Tags({"listen", "beats", "tcp", "logs"})
@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " +
"to the content of a FlowFile." +
"This processor replaces the now deprecated ListenLumberjack")
"This processor replaces the now deprecated/removed ListenLumberjack")
@WritesAttributes({
@WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."),
@WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."),

View File

@ -1,298 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.dispatcher;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Accepts Socket connections on the given port and creates a handler for each connection to
* be executed by a thread pool.
*/
public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements AsyncChannelDispatcher {
private final EventFactory<E> eventFactory;
private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory;
private final ByteBufferSource bufferSource;
private final BlockingQueue<E> events;
private final ComponentLog logger;
private final int maxConnections;
private final int maxThreadPoolSize;
private final SSLContext sslContext;
private final ClientAuth clientAuth;
private final Charset charset;
private ThreadPoolExecutor executor;
private volatile boolean stopped = false;
private Selector selector;
private final BlockingQueue<SelectionKey> keyQueue;
private final AtomicInteger currentConnections = new AtomicInteger(0);
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final SSLContext sslContext,
final Charset charset) {
this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset);
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final SSLContext sslContext,
final ClientAuth clientAuth,
final Charset charset) {
this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset);
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final int maxThreadPoolSize,
final SSLContext sslContext,
final ClientAuth clientAuth,
final Charset charset) {
this.eventFactory = eventFactory;
this.handlerFactory = handlerFactory;
this.bufferSource = bufferSource;
this.events = events;
this.logger = logger;
this.maxConnections = maxConnections;
this.maxThreadPoolSize = maxThreadPoolSize;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.sslContext = sslContext;
this.clientAuth = clientAuth;
this.charset = charset;
}
@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
final InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port);
stopped = false;
executor = new ThreadPoolExecutor(
maxThreadPoolSize,
maxThreadPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-worker-%d").build());
executor.allowCoreThreadTimeOut(true);
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
if (maxBufferSize > 0) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
if (actualReceiveBufSize < maxBufferSize) {
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
+ "maximum receive buffer");
}
}
serverSocketChannel.socket().bind(inetSocketAddress);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!stopped) {
try {
int selected = selector.select();
// if stopped the selector could already be closed which would result in a ClosedSelectorException
if (selected > 0 && !stopped){
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
// if stopped we don't want to modify the keys because close() may still be in progress
while (selectorKeys.hasNext() && !stopped) {
SelectionKey key = selectorKeys.next();
selectorKeys.remove();
if (!key.isValid()){
continue;
}
if (key.isAcceptable()) {
// Handle new connections coming in
final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = channel.accept();
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
// Check for available connections
if (currentConnections.incrementAndGet() > maxConnections){
currentConnections.decrementAndGet();
logger.warn("Rejecting connection from {} because max connections has been met",
new Object[]{ socketChannel.getRemoteAddress().toString() });
IOUtils.closeQuietly(socketChannel);
continue;
}
logger.debug("Accepted incoming connection from {}",
new Object[]{socketChannel.getRemoteAddress().toString()});
// Set socket to non-blocking, and register with selector
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
// Prepare the byte buffer for the reads, clear it out
ByteBuffer buffer = bufferSource.acquire();
// If we have an SSLContext then create an SSLEngine for the channel
SSLSocketChannel sslSocketChannel = null;
if (sslContext != null) {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
switch (clientAuth) {
case REQUIRED:
sslEngine.setNeedClientAuth(true);
break;
case WANT:
sslEngine.setWantClientAuth(true);
break;
case NONE:
sslEngine.setNeedClientAuth(false);
sslEngine.setWantClientAuth(false);
break;
}
sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
}
// Attach the buffer and SSLSocketChannel to the key
SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslSocketChannel);
readKey.attach(attachment);
} else if (key.isReadable()) {
// Clear out the operations the select is interested in until done reading
key.interestOps(0);
// Create a handler based on the protocol and whether an SSLEngine was provided or not
final Runnable handler;
if (sslContext != null) {
handler = handlerFactory.createSSLHandler(key, this, charset, eventFactory, events, logger);
} else {
handler = handlerFactory.createHandler(key, this, charset, eventFactory, events, logger);
}
// run the handler
executor.execute(handler);
}
}
}
// Add back all idle sockets to the select
SelectionKey key;
while((key = keyQueue.poll()) != null){
key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
logger.error("Error accepting connection from SocketChannel", e);
}
}
}
@Override
public int getPort() {
// Return the port for the key listening for accepts
for(SelectionKey key : selector.keys()){
if (key.isValid()) {
final Channel channel = key.channel();
if (channel instanceof ServerSocketChannel) {
return ((ServerSocketChannel)channel).socket().getLocalPort();
}
}
}
return 0;
}
@Override
public void close() {
stopped = true;
if (selector != null) {
selector.wakeup();
}
if (executor != null) {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
if (selector != null) {
synchronized (selector.keys()) {
for (SelectionKey key : selector.keys()) {
IOUtils.closeQuietly(key.channel());
}
}
}
IOUtils.closeQuietly(selector);
}
@Override
public void completeConnection(SelectionKey key) {
// connection is done. Releasing buffer
final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
bufferSource.release(attachment.getByteBuffer());
currentConnections.decrementAndGet();
}
@Override
public void addBackForSelection(SelectionKey key) {
keyQueue.offer(key);
selector.wakeup();
}
}

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import java.nio.channels.SelectionKey;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Factory that can produce ChannelHandlers for the given type of Event and ChannelDispatcher.
*/
public interface ChannelHandlerFactory<E extends Event, D extends ChannelDispatcher> {
ChannelHandler<E, D> createHandler(final SelectionKey key,
final D dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger);
ChannelHandler<E, D> createSSLHandler(final SelectionKey key,
final D dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger);
}

View File

@ -1,153 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.handler.socket;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
/**
* Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS.
*/
public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> {
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
public SSLSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public void run() {
boolean eof = false;
SSLSocketChannel sslSocketChannel = null;
try {
int bytesRead;
final SocketChannel socketChannel = (SocketChannel) key.channel();
final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
// get the SSLSocketChannel from the attachment
sslSocketChannel = attachment.getSslSocketChannel();
// SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[]
final ByteBuffer socketBuffer = attachment.getByteBuffer();
byte[] socketBufferArray = new byte[socketBuffer.limit()];
// read until no more data
try {
while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) {
processBuffer(sslSocketChannel, socketChannel, bytesRead, socketBufferArray);
logger.debug("bytes read from sslSocketChannel {}", new Object[]{bytesRead});
}
} catch (SocketTimeoutException ste) {
// SSLSocketChannel will throw this exception when 0 bytes are read and the timeout threshold
// is exceeded, we don't want to close the connection in this case
bytesRead = 0;
}
// Check for closed socket
if( bytesRead < 0 ){
eof = true;
logger.debug("Reached EOF, closing connection");
} else {
logger.debug("No more data available, returning for selection");
}
} catch (ClosedByInterruptException | InterruptedException e) {
logger.debug("read loop interrupted, closing connection");
// Treat same as closed socket
eof = true;
} catch (ClosedChannelException e) {
// ClosedChannelException doesn't have a message so handle it separately from IOException
logger.error("Error reading from channel due to channel being closed", e);
// Treat same as closed socket
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e);
// Treat same as closed socket
eof = true;
} finally {
if(eof == true) {
IOUtils.closeQuietly(sslSocketChannel);
dispatcher.completeConnection(key);
} else {
dispatcher.addBackForSelection(key);
}
}
}
/**
* Process the contents of the buffer. Give sub-classes a chance to override this behavior.
*
* @param sslSocketChannel the channel the data was read from
* @param socketChannel the socket channel being wrapped by sslSocketChannel
* @param bytesRead the number of bytes read
* @param buffer the buffer to process
* @throws InterruptedException thrown if interrupted while queuing events
*/
protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
final InetAddress sender = socketChannel.socket().getInetAddress();
// go through the buffer looking for the end of each message
for (int i = 0; i < bytesRead; i++) {
final byte currByte = buffer[i];
// check if at end of a message
if (currByte == getDelimiter()) {
if (currBytes.size() > 0) {
final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.offer(event);
currBytes.reset();
}
} else {
currBytes.write(currByte);
}
}
}
@Override
public byte getDelimiter() {
return TCP_DELIMITER;
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.handler.socket;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Base class for socket channel handlers.
*/
public abstract class SocketChannelHandler<E extends Event<SocketChannel>> extends ChannelHandler<E, AsyncChannelDispatcher> {
static final byte TCP_DELIMITER = '\n';
public SocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
}
/**
* @return the byte used as the delimiter between messages for the given handler
*/
public abstract byte getDelimiter();
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.handler.socket;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Default factory for creating socket channel handlers.
*/
public class SocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new StandardSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new SSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
}

View File

@ -1,158 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.handler.socket;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
/**
* Reads from the given SocketChannel into the provided buffer. If the given delimiter is found, the data
* read up to that point is queued for processing.
*/
public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> {
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
public StandardSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public void run() {
boolean eof = false;
SocketChannel socketChannel = null;
try {
int bytesRead;
socketChannel = (SocketChannel) key.channel();
final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
final ByteBuffer socketBuffer = attachment.getByteBuffer();
// read until the buffer is full
while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
// prepare byte buffer for reading
socketBuffer.flip();
// mark the current position as start, in case of partial message read
socketBuffer.mark();
// process the contents that have been read into the buffer
processBuffer(socketChannel, socketBuffer);
// Preserve bytes in buffer for next call to run
// NOTE: This code could benefit from the two ByteBuffer read calls to avoid
// this compact for higher throughput
socketBuffer.reset();
socketBuffer.compact();
logger.debug("bytes read {}", new Object[]{bytesRead});
}
// Check for closed socket
if( bytesRead < 0 ){
eof = true;
logger.debug("Reached EOF, closing connection");
} else {
logger.debug("No more data available, returning for selection");
}
} catch (ClosedByInterruptException | InterruptedException e) {
logger.debug("read loop interrupted, closing connection");
// Treat same as closed socket
eof = true;
} catch (ClosedChannelException e) {
// ClosedChannelException doesn't have a message so handle it separately from IOException
logger.error("Error reading from channel due to channel being closed", e);
// Treat same as closed socket
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e);
// Treat same as closed socket
eof = true;
} finally {
if(eof == true) {
IOUtils.closeQuietly(socketChannel);
dispatcher.completeConnection(key);
} else {
dispatcher.addBackForSelection(key);
}
}
}
/**
* Process the contents that have been read into the buffer. Allow sub-classes to override this behavior.
*
* @param socketChannel the channel the data was read from
* @param socketBuffer the buffer the data was read into
* @throws InterruptedException if interrupted when queuing events
*/
protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) throws InterruptedException, IOException {
// get total bytes in buffer
final int total = socketBuffer.remaining();
final InetAddress sender = socketChannel.socket().getInetAddress();
// go through the buffer looking for the end of each message
currBytes.reset();
for (int i = 0; i < total; i++) {
// NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved
// Pull data out of buffer and cram into byte array
byte currByte = socketBuffer.get();
// check if at end of a message
if (currByte == getDelimiter()) {
if (currBytes.size() > 0) {
final SocketChannelResponder response = new SocketChannelResponder(socketChannel);
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
final E event = eventFactory.create(currBytes.toByteArray(), metadata, response);
events.offer(event);
currBytes.reset();
// Mark this as the start of the next message
socketBuffer.mark();
}
} else {
currBytes.write(currByte);
}
}
}
@Override
public byte getDelimiter() {
return TCP_DELIMITER;
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.response.socket;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import java.io.IOException;
import java.nio.channels.SocketChannel;
/**
* A ChannelResponder for SSLSocketChannels.
*/
public class SSLSocketChannelResponder extends SocketChannelResponder {
private SSLSocketChannel sslSocketChannel;
public SSLSocketChannelResponder(final SocketChannel socketChannel, final SSLSocketChannel sslSocketChannel) {
super(socketChannel);
this.sslSocketChannel = sslSocketChannel;
}
@Override
public void respond() throws IOException {
for (final ChannelResponse response : responses) {
sslSocketChannel.write(response.toByteArray());
}
}
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.response.socket;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* A ChannelResponder for SocketChannels. The SocketChannel should first be registered with a selector,
* upon being selected for writing the respond() method should be executed.
*/
public class SocketChannelResponder implements ChannelResponder<SocketChannel> {
protected final List<ChannelResponse> responses;
protected final SocketChannel socketChannel;
public SocketChannelResponder(final SocketChannel socketChannel) {
this.responses = new ArrayList<>();
this.socketChannel = socketChannel;
}
@Override
public SocketChannel getChannel() {
return socketChannel;
}
@Override
public List<ChannelResponse> getResponses() {
return Collections.unmodifiableList(responses);
}
@Override
public void addResponse(ChannelResponse response) {
this.responses.add(response);
}
@Override
public void respond() throws IOException {
for (final ChannelResponse response : responses) {
final ByteBuffer responseBuffer = ByteBuffer.wrap(response.toByteArray());
while (responseBuffer.hasRemaining()) {
socketChannel.write(responseBuffer);
}
}
}
}

View File

@ -1,43 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-bundle</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lumberjack-nar</artifactId>
<version>1.16.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-processors</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.16.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -1,233 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
under an MIT style license.
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,34 +0,0 @@
nifi-lumberjack-nar
Copyright 2014-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.

View File

@ -1,91 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-bundle</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lumberjack-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-socket-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- This profile, activating when compiling on Java versions above 1.8, provides configuration changes to
allow NiFi to be compiled on those JDKs. -->
<id>jigsaw</id>
<activation>
<jdk>(1.8,)</jdk>
</activation>
<dependencies>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -1,237 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack;
import com.google.gson.Gson;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
import org.apache.nifi.processors.lumberjack.handler.LumberjackSocketChannelHandlerFactory;
import org.apache.nifi.processors.lumberjack.response.LumberjackChannelResponse;
import org.apache.nifi.processors.lumberjack.response.LumberjackResponse;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@Deprecated
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "lumberjack", "tcp", "logs"})
@CapabilityDescription("This processor is deprecated and may be removed in the near future. Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " +
"acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " +
"portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " +
"output of this processor can be sent to a ParseSyslog processor for further processing. ")
@WritesAttributes({
@WritesAttribute(attribute = "lumberjack.sender", description = "The sending host of the messages."),
@WritesAttribute(attribute = "lumberjack.port", description = "The sending port the messages were received over."),
@WritesAttribute(attribute = "lumberjack.sequencenumber", description = "The sequence number of the message. Only included if <Batch Size> is 1."),
@WritesAttribute(attribute = "lumberjack.*", description = "The keys and respective values as sent by the lumberjack producer. Only included if <Batch Size> is 1."),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain")
})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
/**
* @deprecated As of release 1.2.0, replaced by {@link org.apache.nifi.processors.beats.ListenBeats}
* */
public class ListenLumberjack extends AbstractListenEventBatchingProcessor<LumberjackEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection. Note that as Lumberjack client requires" +
"two-way SSL authentication, the controller MUST have a truststore and a keystore to work" +
"properly.")
.required(true)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
SSL_CONTEXT_SERVICE
);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isTrustStoreConfigured() == false) {
results.add(new ValidationResult.Builder()
.explanation("The context service must have a truststore configured for the lumberjack forwarder client to work correctly")
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
}
return results;
}
private volatile LumberjackEncoder lumberjackEncoder;
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
// wanted to ensure charset was already populated here
lumberjackEncoder = new LumberjackEncoder();
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<LumberjackEvent> events) throws IOException {
final EventFactory<LumberjackEvent> eventFactory = new LumberjackEventFactory();
final ChannelHandlerFactory<LumberjackEvent, AsyncChannelDispatcher> handlerFactory = new LumberjackSocketChannelHandlerFactory<>();
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
sslContext = sslContextService.createContext();
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events,
getLogger(), maxConnections, sslContext, charSet);
}
@Override
protected String getBatchKey(LumberjackEvent event) {
return event.getSender();
}
protected void respond(final LumberjackEvent event, final LumberjackResponse lumberjackResponse) {
final ChannelResponse response = new LumberjackChannelResponse(lumberjackEncoder, lumberjackResponse);
final ChannelResponder responder = event.getResponder();
responder.addResponse(response);
try {
responder.respond();
} catch (IOException e) {
getLogger().error("Error sending response for transaction {} due to {}",
new Object[]{event.getSeqNumber(), e.getMessage()}, e);
}
}
protected void postProcess(final ProcessContext context, final ProcessSession session, final List<LumberjackEvent> events) {
// first commit the session so we guarantee we have all the events successfully
// written to FlowFiles and transferred to the success relationship
session.commitAsync(() -> {
// respond to each event to acknowledge successful receipt
for (final LumberjackEvent event : events) {
respond(event, LumberjackResponse.ok(event.getSeqNumber()));
}
});
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("lumberjack").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
@Override
protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
final List<LumberjackEvent> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final int numAttributes = events.size() == 1 ? 5 : 4;
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(LumberjackAttributes.SENDER.key(), sender);
attributes.put(LumberjackAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
// if there was only one event then we can pass on the transaction
// NOTE: we could pass on all the transaction ids joined together
if (events.size() == 1) {
attributes.put(LumberjackAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber()));
// Convert the serialized fields from JSON
String serialFields = String.valueOf(events.get(0).getFields());
Gson jsonObject = new Gson();
Map<String, String> fields = jsonObject.fromJson(serialFields, Map.class);
for (Map.Entry<String, String> entry : fields.entrySet()) {
attributes.put(LumberjackAttributes.FIELDS.key().concat(".").concat(entry.getKey()), entry.getValue());
}
}
return attributes;
}
public enum LumberjackAttributes implements FlowFileAttributeKey {
SENDER("lumberjack.sender"),
PORT("lumberjack.port"),
SEQNUMBER("lumberjack.sequencenumber"),
FIELDS("lumberjack.fields");
private final String key;
LumberjackAttributes(String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.nio.channels.SocketChannel;
/**
* A Lumberjack event which adds the transaction number and command to the StandardEvent.
*/
@Deprecated
public class LumberjackEvent extends StandardEvent<SocketChannel> {
private final long seqNumber;
private final String fields;
public LumberjackEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final long seqNumber, String fields) {
super(sender, data, responder);
this.seqNumber = seqNumber;
this.fields = fields;
}
public long getSeqNumber() {
return seqNumber;
}
public String getFields() {
return fields;
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.util.Map;
/**
* An EventFactory implementation to create LumberjackEvents.
*/
@Deprecated
public class LumberjackEventFactory implements EventFactory<LumberjackEvent> {
@Override
public LumberjackEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
final String sender = metadata.get(EventFactory.SENDER_KEY);
final long seqNumber = Long.valueOf(metadata.get(LumberjackMetadata.SEQNUMBER_KEY));
final String fields = metadata.get(LumberjackMetadata.FIELDS_KEY);
return new LumberjackEvent(sender, data, responder, seqNumber, fields);
}
}

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
/**
* Metadata keys for Lumberjack.
*/
@Deprecated
public interface LumberjackMetadata {
String SEQNUMBER_KEY = "lumberjack.sequencenumber";
String FIELDS_KEY = "lumberjack.fields";
}

View File

@ -1,299 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.InflaterInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decodes a Lumberjack frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
*/
@Deprecated
public class LumberjackDecoder {
static final Logger logger = LoggerFactory.getLogger(LumberjackDecoder.class);
private LumberjackFrame.Builder frameBuilder;
private LumberjackState currState = LumberjackState.VERSION;
private byte decodedFrameType;
private byte[] decompressedData;
private final Charset charset;
private final ByteArrayOutputStream currBytes;
private long windowSize;
public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
/**
* @param charset the charset to decode bytes from the Lumberjack frame
*/
public LumberjackDecoder(final Charset charset) {
this(charset, new ByteArrayOutputStream(4096));
}
/**
* @param charset the charset to decode bytes from the Lumberjack frame
* @param buffer a buffer to use while processing the bytes
*/
public LumberjackDecoder(final Charset charset, final ByteArrayOutputStream buffer) {
this.charset = charset;
this.currBytes = buffer;
this.frameBuilder = new LumberjackFrame.Builder();
this.decodedFrameType = 0x00;
}
/**
* Resets this decoder back to its initial state.
*/
public void reset() {
frameBuilder = new LumberjackFrame.Builder();
currState = LumberjackState.VERSION;
decodedFrameType = 0x00;
currBytes.reset();
}
/**
* Process the next byte from the channel, updating the builder and state accordingly.
*
* @param currByte the next byte to process
* @preturn true if a frame is ready to be retrieved, false otherwise
*/
public boolean process(final byte currByte) throws LumberjackFrameException {
try {
switch (currState) {
case VERSION:
processVERSION(currByte);
break;
case FRAMETYPE:
processFRAMETYPE(currByte);
break;
case PAYLOAD:
processPAYLOAD(currByte);
if (frameBuilder.frameType == FRAME_WINDOWSIZE && currState == LumberjackState.COMPLETE) {
return true;
} else if (frameBuilder.frameType == FRAME_COMPRESSED && currState == LumberjackState.COMPLETE) {
return true;
} else {
break;
}
case COMPLETE:
return true;
default:
break;
}
return false;
} catch (Exception e) {
throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e);
}
}
/**
* Returns the decoded frame and resets the decoder for the next frame.
* This method should be called after checking isComplete().
*
* @return the LumberjackFrame that was decoded
*/
public List<LumberjackFrame> getFrames() throws LumberjackFrameException {
List<LumberjackFrame> frames = new LinkedList<>();
if (currState != LumberjackState.COMPLETE) {
throw new LumberjackFrameException("Must be at the trailer of a frame");
}
try {
if (currState == LumberjackState.COMPLETE && frameBuilder.frameType == FRAME_COMPRESSED) {
logger.debug("Frame is compressed, will iterate to decode", new Object[]{});
// LumberjackDecoder decompressedDecoder = new LumberjackDecoder();
// Zero currBytes, currState and frameBuilder prior to iteration over
// decompressed bytes
currBytes.reset();
frameBuilder.reset();
currState = LumberjackState.VERSION;
// Run over decompressed data.
frames = processDECOMPRESSED(decompressedData);
} else {
final LumberjackFrame frame = frameBuilder.build();
currBytes.reset();
frameBuilder.reset();
currState = LumberjackState.VERSION;
frames.add(frame);
}
return frames;
} catch (Exception e) {
throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e);
}
}
private List<LumberjackFrame> processDECOMPRESSED(byte[] decompressedData) {
List<LumberjackFrame> frames = new LinkedList<>();
LumberjackFrame.Builder internalFrameBuilder = new LumberjackFrame.Builder();
ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
// Lumberjack has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames.
// inside a compressed input.
// Or as stated in the documentation:
//
// "As an example, you could have 3 data frames compressed into a single
// 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
//
// Therefore, instead of calling process method again, just iterate over each of
// the frames and split them so they can be processed by LumberjackFrameHandler
while (currentData.hasRemaining()) {
int payloadLength = 0;
internalFrameBuilder.version = currentData.get();
internalFrameBuilder.frameType = currentData.get();
internalFrameBuilder.seqNumber = currentData.getInt() & 0x00000000ffffffffL;
currentData.mark();
// Set the payloadLength to negative to avoid doing math
// around valueLength and valueLength
payloadLength = payloadLength - currentData.position();
long pairCount = currentData.getInt() & 0x00000000ffffffffL;
for (int i = 0; i < pairCount; i++) {
long keyLength = currentData.getInt() & 0x00000000ffffffffL;
currentData.position(currentData.position() + (int) keyLength);
long valueLength = currentData.getInt() & 0x00000000ffffffffL;
currentData.position(currentData.position() + (int) valueLength);
}
// Infer the length of the payload from position...
payloadLength = payloadLength + currentData.position();
// Reset to mark (i.e. skip frame headers) prior to getting the data
currentData.reset();
// get the data, shift mark and compact so next iteration can
// read rest of buffer.
byte[] bytes = new byte[payloadLength];
currentData.get(bytes, 0, payloadLength);
currentData.mark();
// Add payload to frame
internalFrameBuilder.payload(bytes);
// data frame is created
LumberjackFrame frame = internalFrameBuilder.build();
frames.add(frame);
internalFrameBuilder.reset();
}
return frames;
}
private void processVERSION(final byte b) {
byte version = b;
frameBuilder.version(version);
logger.debug("Version number is {}", new Object[]{version});
currBytes.write(b);
currState = LumberjackState.FRAMETYPE;
}
private void processFRAMETYPE(final byte b) {
decodedFrameType = b;
frameBuilder.frameType(decodedFrameType);
logger.debug("Frame type is {}", new Object[]{decodedFrameType});
currBytes.write(b);
currState = LumberjackState.PAYLOAD;
}
private void processPAYLOAD(final byte b) {
currBytes.write(b);
switch (decodedFrameType) {
case FRAME_WINDOWSIZE: //'W'
if (currBytes.size() < 6) {
logger.trace("Lumberjack currBytes contents are {}", currBytes.toString());
break;
} else if (currBytes.size() == 6) {
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize});
// Sets payload to empty as frame contains no data
frameBuilder.payload(new byte[]{});
currBytes.reset();
currState = LumberjackState.COMPLETE;
windowSize = frameBuilder.dataSize;
break;
} else {
break;
}
case FRAME_COMPRESSED: //'C'
if (currBytes.size() < 6) {
logger.trace("Lumberjack currBytes contents are {}", currBytes.toString());
break;
} else if (currBytes.size() >= 6) {
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
if (currBytes.size() - 6 == frameBuilder.dataSize) {
try {
byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf));
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
in.close();
out.close();
decompressedData = out.toByteArray();
// buf is no longer needed
buf = null;
logger.debug("Finished decompressing data");
// Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them
// as type 'D' frames
frameBuilder.dataSize(decompressedData.length);
currState = LumberjackState.COMPLETE;
} catch (IOException e) {
throw new LumberjackFrameException("Error decompressing frame: " + e.getMessage(), e);
}
}
break;
// If currentByte.size is not lower than six and also not equal or great than 6...
} else {
break;
}
}
}
private void processCOMPLETE() {
currBytes.reset();
frameBuilder.reset();
currState = LumberjackState.VERSION;
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Encodes a LumberjackFrame into raw bytes using the given charset.
*/
@Deprecated
public class LumberjackEncoder {
public byte[] encode(final LumberjackFrame frame) {
final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
// Writes the version
buffer.write(frame.getVersion());
// Writes the frameType
buffer.write(frame.getFrameType());
// Writes the sequence number
try {
buffer.write(frame.getPayload());
} catch (IOException e) {
throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e);
}
return buffer.toByteArray();
}
}

View File

@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
/**
* A Lumberjack frame received from a channel.
*/
@Deprecated
public class LumberjackFrame {
public static final byte DELIMITER = 10;
private final byte version;
private final byte frameType;
private final byte[] payload;
private final long dataSize;
private final long seqNumber;
private LumberjackFrame(final Builder builder) {
this.version = builder.version;
this.frameType = builder.frameType;
this.payload = builder.payload;
this.dataSize = builder.dataSize;
this.seqNumber = builder.seqNumber;
if (version < 2 || payload.length < 0 ) {
throw new LumberjackFrameException("Invalid Frame");
}
}
public long getSeqNumber() {
return seqNumber;
}
public byte getVersion() {
return version;
}
public byte getFrameType() {
return frameType;
}
public byte [] getPayload() {
return payload;
}
/**
* Builder for a LumberjackFrame.
*/
public static class Builder {
byte version;
byte frameType;
byte [] payload;
long dataSize;
long seqNumber;
public Builder() {
reset();
}
public void reset() {
version = -1;
seqNumber = -1;
frameType = -1;
payload = null;
}
public Builder version(final byte version) {
this.version = version;
return this;
}
public Builder seqNumber(final long seqNumber) {
this.seqNumber = seqNumber;
return this;
}
public Builder frameType(final byte frameType) {
this.frameType = frameType;
return this;
}
public Builder dataSize(final long dataSize) {
this.dataSize = dataSize;
return this;
}
public Builder payload(final byte [] payload) {
this.payload = payload;
return this;
}
public LumberjackFrame build() {
return new LumberjackFrame(this);
}
}
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
/**
* Represents an error encountered when decoding Lumberjack frames.
*/
@Deprecated
public class LumberjackFrameException extends RuntimeException {
public LumberjackFrameException(String message) {
super(message);
}
public LumberjackFrameException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
/**
* The parts of a Lumberjack frame.
*/
@Deprecated
public enum LumberjackState {
VERSION,
FRAMETYPE,
PAYLOAD,
COMPLETE
}

View File

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import com.google.gson.Gson;
/**
* Encapsulates the logic to handle a LumberjackFrame once it has been read from the channel.
*/
@Deprecated
public class LumberjackFrameHandler<E extends Event<SocketChannel>> {
private final Charset charset;
private final EventFactory<E> eventFactory;
private final EventQueue<E> events;
private final SelectionKey key;
private final AsyncChannelDispatcher dispatcher;
private final LumberjackEncoder encoder;
private final ComponentLog logger;
public LumberjackFrameHandler(final SelectionKey selectionKey,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final AsyncChannelDispatcher dispatcher,
final ComponentLog logger) {
this.key = selectionKey;
this.charset = charset;
this.eventFactory = eventFactory;
this.dispatcher = dispatcher;
this.logger = logger;
this.events = new EventQueue<>(events, logger);
this.encoder = new LumberjackEncoder();
}
public void handle(final LumberjackFrame frame, final ChannelResponder<SocketChannel> responder, final String sender)
throws IOException, InterruptedException {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
String line = "";
/* If frameType is a data Frame, Handle the Lumberjack data payload, iterating over it and extracting
keys and values into metadata.
All keys are inserted into metadata with the exception of line that gets added into the body of the event
*/
if (frame.getFrameType() == 0x44) {
ByteBuffer currentData = ByteBuffer.wrap(frame.getPayload());
long pairCount = currentData.getInt() & 0x00000000ffffffffL;
Map<String,String> fields = new HashMap<>();
for (int i = 0; i < pairCount; i++) {
long keyLength = currentData.getInt() & 0x00000000ffffffffL;
byte[] bytes = new byte[(int) keyLength];
currentData.get(bytes);
String key = new String(bytes);
long valueLength = currentData.getInt() & 0x00000000ffffffffL;
bytes = new byte[(int) valueLength];
currentData.get(bytes);
String value = new String(bytes);
if (key.equals("line")) {
line = value;
} else {
fields.put(key, value);
}
}
// Serialize the fields into a String to push it via metdate
Gson serialFields = new Gson();
metadata.put("lumberjack.fields", serialFields.toJson(fields).toString());
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(line.getBytes(), metadata, responder);
events.offer(event);
} else if (frame.getFrameType() == 0x4A ) {
logger.error("Data type was JSON. JSON payload aren't yet supported, pending the documentation of Lumberjack protocol v2");
}
}
}

View File

@ -1,95 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackDecoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
/**
* A Lumberjack implementation of SSLSocketChannelHandler.
*/
@Deprecated
public class LumberjackSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
private LumberjackDecoder decoder;
private LumberjackFrameHandler<E> frameHandler;
public LumberjackSSLSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new LumberjackDecoder(charset);
this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the Lumberjack command
for (int i = 0; i < bytesRead; i++) {
byte currByte = buffer[i];
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<LumberjackFrame> frames = decoder.getFrames();
// A list of events has been generated
for (LumberjackFrame frame : frames) {
logger.debug("Received Lumberjack frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOWS type frames as they contain no payload.
if (frame.getFrameType() != 0x57 ) {
final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
}
}
logger.debug("Done processing buffer");
} catch (final LumberjackFrameException rfe) {
logger.error("Error reading Lumberjack frames due to {}", new Object[] {rfe.getMessage()} , rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
}

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackDecoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException;
/**
* Extends the StandardSocketChannelHandler to decode bytes into Lumberjack frames.
*/
@Deprecated
public class LumberjackSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
private LumberjackDecoder decoder;
private LumberjackFrameHandler<E> frameHandler;
public LumberjackSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new LumberjackDecoder(charset);
this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer)
throws InterruptedException, IOException {
// get total bytes in buffer
final int total = socketBuffer.remaining();
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the Lumberjack command
for (int i = 0; i < total; i++) {
byte currByte = socketBuffer.get();
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<LumberjackFrame> frames = decoder.getFrames();
for (LumberjackFrame frame : frames) {
//TODO: Clean this
logger.debug("Received Lumberjack frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOWS type frames as they contain no payload.
if (frame.getFrameType() != 0x57) {
final SocketChannelResponder responder = new SocketChannelResponder(socketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
socketBuffer.mark();
}
}
logger.debug("Done processing buffer");
} catch (final LumberjackFrameException rfe) {
logger.error("Error reading Lumberjack frames due to {}", new Object[] {rfe.getMessage()}, rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
// not used for anything in Lumberjack since the decoder encapsulates the delimiter
@Override
public byte getDelimiter() {
return LumberjackFrame.DELIMITER;
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
/**
* Default factory for creating Lumberjack socket channel handlers.
*/
@Deprecated
public class LumberjackSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new LumberjackSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new LumberjackSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.response;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
/**
* Creates a LumberjackFrame for the provided response and returns the encoded frame.
*/
public class LumberjackChannelResponse implements ChannelResponse {
private final LumberjackEncoder encoder;
private final LumberjackResponse response;
public LumberjackChannelResponse(final LumberjackEncoder encoder, final LumberjackResponse response) {
this.encoder = encoder;
this.response = response;
}
@Override
public byte[] toByteArray() {
final LumberjackFrame frame = response.toFrame();
return encoder.encode(frame);
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.response;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import java.nio.ByteBuffer;
/**
'ack' frame type
SENT FROM READER ONLY
frame type value: ASCII 'A' aka byte value 0x41
Payload:
32bit unsigned sequence number.
*/
public class LumberjackResponse {
private final long seqNumber;
final private byte version = 0x31, frameType = 0x41;
public LumberjackResponse(final long seqNumber) {
this.seqNumber = seqNumber;
}
/**
* Creates a LumberjackFrame where the data portion will contain this response.
*
*
* @return a LumberjackFrame for for this response
*/
public LumberjackFrame toFrame() {
return new LumberjackFrame.Builder()
.version(version)
.frameType(frameType)
.payload(ByteBuffer.allocate(4).putInt((int) seqNumber).array())
.build();
}
public static LumberjackResponse ok(final long seqNumber) {
return new LumberjackResponse(seqNumber);
}
}

View File

@ -1,15 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.lumberjack.ListenLumberjack

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.junit.Assert;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestLumberjackEventFactory {
@Test
public void testCreateLumberJackEvent() {
final String sender = "testsender1";
final byte[] data = "this is a test line".getBytes();
final long seqNumber = 1;
final String fields = "{\"file\":\"test\"}";
final Map<String,String> metadata = new HashMap<>();
metadata.put(EventFactory.SENDER_KEY, sender);
metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber));
metadata.put(LumberjackMetadata.FIELDS_KEY, String.valueOf(fields));
final ChannelResponder responder = new SocketChannelResponder(null);
final EventFactory<LumberjackEvent> factory = new LumberjackEventFactory();
final LumberjackEvent event = factory.create(data, metadata, responder);
Assert.assertEquals(sender, event.getSender());
Assert.assertEquals(seqNumber, event.getSeqNumber());
Assert.assertEquals(fields, event.getFields());
Assert.assertEquals(data, event.getData());
}
}

View File

@ -1,99 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.DatatypeConverter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestLumberjackDecoder {
// Because no encoder for type 43 was coded, added Static hex
// representation of compressed data
//
private static final String singleFrameData = "3143000000aa785e4c8e4daac3300c8413c8cbfeddc017681da7b48540775df51245103936f54fb" +
"04c4a6e5f6917d03020e91bc93c9ba669597faccefa80ec0fed72440dd1174833e819370c798d98aa0e79a10ae44e36972f94198b26886bc" +
"0774422589024c865aaecff07f24c6e1b0c37fb6c2da18cdb4176834f72747c4152e6aa46330db7e9725707567db0240c93aace93e212464" +
"95857f755e89e76e2d77e000000ffff010000ffff05b43bb8";
private static final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728031957a97f82" +
"232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f021f71461b26873e711bee9480f48b0af10fe28" +
"89113b8c9e28f4322b82395413a50cafd79957c253d0b992faf4129c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f468079" +
"6b421964fc9b032ac4dcb54d2575a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff" +
"010000ffff35e0eff0";
private static final String payload = "";
private LumberjackDecoder decoder;
@Before
public void setup() {
this.decoder = new LumberjackDecoder(StandardCharsets.UTF_8);
}
@Test
public void testDecodeSingleFrame() {
final byte[] input = DatatypeConverter.parseHexBinary(singleFrameData);
List<LumberjackFrame> frames = null;
LumberjackFrame frame = null;
for (byte b : input) {
if (decoder.process(b)) {
frames = decoder.getFrames();
break;
}
}
frame = frames.get(frames.size() - 1);
Assert.assertNotNull(frame);
Assert.assertEquals(0x31, frame.getVersion());
Assert.assertEquals(0x44, frame.getFrameType());
Assert.assertEquals(1, frame.getSeqNumber());
// Look for a predefined number of bytes for matching of the inner payload
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15));
}
@Test
public void testDecodeMultipleFrame() {
final byte[] input = DatatypeConverter.parseHexBinary(multiFrameData);
List<LumberjackFrame> frames = null;
LumberjackFrame frame = null;
for (byte b : input) {
if (decoder.process(b)) {
frames = decoder.getFrames();
break;
}
}
frame = frames.get(1);
Assert.assertNotNull(frame);
Assert.assertEquals(0x31, frame.getVersion());
Assert.assertEquals(0x44, frame.getFrameType());
// Load the second frame therefore seqNumber = 2
Assert.assertEquals(2, frame.getSeqNumber());
// Look for a predefined number of bytes for matching of the inner payload
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15));
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.nio.ByteBuffer;
import javax.xml.bind.DatatypeConverter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestLumberjackEncoder {
private LumberjackEncoder encoder;
@Before
public void setup() {
this.encoder = new LumberjackEncoder();
}
@Test
public void testEncode() {
LumberjackFrame frame = new LumberjackFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x41)
.payload(ByteBuffer.allocate(8).putLong(123).array())
.build();
byte[] encoded = encoder.encode(frame);
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("3141000000000000007B"), encoded);
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestLumberjackFrame {
@Test(expected = LumberjackFrameException.class)
public void testInvalidVersion() {
new LumberjackFrame.Builder().seqNumber(1234).dataSize(3).build();
}
@Test(expected = LumberjackFrameException.class)
public void testInvalidFrameType() {
new LumberjackFrame.Builder().frameType((byte) 0x70).dataSize(5).build();
}
@Test(expected = LumberjackFrameException.class)
public void testBlankFrameType() {
new LumberjackFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build();
}
}

View File

@ -1,205 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("deprecation")
public class ITLumberjackSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
private SSLContext sslContext;
private Charset charset;
private ChannelDispatcher dispatcher;
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}
@Test
public void testBasicHandling() throws IOException, InterruptedException {
final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" +
"031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" +
"1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" +
"29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" +
"a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" +
"0ffff35e0eff0";
final List<String> messages = new ArrayList<>();
messages.add(multiFrameData);
run(messages);
// Check for the 4 frames (from the hex string above) are back...
Assert.assertEquals(4, events.size());
boolean found1 = false;
boolean found2 = false;
boolean found3 = false;
boolean found4 = false;
TestEvent event;
while((event = events.poll()) != null) {
Map<String,String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY));
final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY);
if (seqNum.equals("1")) {
found1 = true;
} else if (seqNum.equals("2")) {
found2 = true;
} else if (seqNum.equals("3")) {
found3 = true;
} else if (seqNum.equals("4")) {
found4 = true;
}
}
Assert.assertTrue(found1);
Assert.assertTrue(found2);
Assert.assertTrue(found3);
Assert.assertTrue(found4);
}
protected void run(List<String> messages) throws IOException, InterruptedException {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);
dispatcherThread.start();
// create a client connection to the port the dispatcher is listening on
final int realPort = dispatcher.getPort();
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", realPort));
Thread.sleep(100);
// send the provided messages
for (int i=0; i < messages.size(); i++) {
buffer.clear();
buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(1);
}
}
// wait up to 10 seconds to verify the responses
long timeout = 10000;
long startTime = System.currentTimeMillis();
while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
Thread.sleep(100);
}
// should have gotten an event for each message sent
Assert.assertEquals(4, events.size());
} finally {
// stop the dispatcher thread and ensure we shut down handler threads
dispatcher.close();
}
}
// Test event to produce from the data
private static class TestEvent implements Event<SocketChannel> {
private byte[] data;
private Map<String, String> metadata;
public TestEvent(byte[] data, Map<String, String> metadata) {
this.data = data;
this.metadata = metadata;
}
@Override
public String getSender() {
return metadata.get(EventFactory.SENDER_KEY);
}
@Override
public byte[] getData() {
return data;
}
@Override
public ChannelResponder<SocketChannel> getResponder() {
return null;
}
}
// Factory to create test events and send responses for testing
private static class TestEventHolderFactory implements EventFactory<TestEvent> {
@Override
public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
return new TestEvent(data, metadata);
}
}
}

View File

@ -1,156 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@SuppressWarnings("deprecation")
public class TestLumberjackFrameHandler {
private Charset charset;
private EventFactory<LumberjackEvent> eventFactory;
private BlockingQueue<LumberjackEvent> events;
private SelectionKey key;
private AsyncChannelDispatcher dispatcher;
private LumberjackEncoder encoder;
private ComponentLog logger;
private LumberjackFrameHandler<LumberjackEvent> frameHandler;
@Before
public void setup() {
this.charset = StandardCharsets.UTF_8;
this.eventFactory = new LumberjackEventFactory();
this.events = new LinkedBlockingQueue<>();
this.key = Mockito.mock(SelectionKey.class);
this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
this.logger = Mockito.mock(ComponentLog.class);
this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Test
public void testWindow() throws IOException, InterruptedException {
final LumberjackFrame openFrame = new LumberjackFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x57)
.seqNumber(-1)
.payload(Integer.toString(1).getBytes())
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
}
@Test
public void testData() throws IOException, InterruptedException {
final byte payload[] = new byte[]{
0x00, 0x00, 0x00, 0x02, // Number of pairs
0x00, 0x00, 0x00, 0x04, // Length of first pair key ('line')
0x6C, 0x69, 0x6E, 0x65, // 'line'
0x00, 0x00, 0x00, 0x0C, // Length of 'test-content'
0x74, 0x65, 0x73, 0x74, //
0x2d, 0x63, 0x6f, 0x6e, // 'test-content'
0x74, 0x65, 0x6e, 0x74, //
0x00, 0x00, 0x00, 0x05, // Length of 2nd pair key (field)
0x66, 0x69, 0x65, 0x6c, //
0x64, // 'field'
0x00, 0x00, 0x00, 0x05, // Length of 'value'
0x76, 0x61, 0x6c, 0x75, // 'value'
0x65
};
final LumberjackFrame dataFrame = new LumberjackFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x44)
.seqNumber(1)
// Payload eq { enil: hello }
.payload(payload)
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(dataFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
// But events should contain one event
Assert.assertEquals(1, events.size());
final LumberjackEvent event = events.poll();
Assert.assertEquals("{\"field\":\"value\"}", new String((event.getFields())));
Assert.assertEquals("test-content", new String(event.getData(), charset));
}
private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
int responded;
List<ChannelResponse> responses = new ArrayList<>();
@Override
public SocketChannel getChannel() {
return Mockito.mock(SocketChannel.class);
}
@Override
public List<ChannelResponse> getResponses() {
return responses;
}
@Override
public void addResponse(ChannelResponse response) {
responses.add(response);
}
@Override
public void respond() throws IOException {
responded++;
}
}
}

View File

@ -1,34 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lumberjack-bundle</artifactId>
<version>1.16.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-lumberjack-processors</module>
<module>nifi-lumberjack-nar</module>
</modules>
</project>

View File

@ -60,7 +60,6 @@
<module>nifi-amqp-bundle</module>
<module>nifi-splunk-bundle</module>
<module>nifi-jms-bundle</module>
<module>nifi-lumberjack-bundle</module>
<module>nifi-beats-bundle</module>
<module>nifi-cassandra-bundle</module>
<module>nifi-spring-bundle</module>