From 8f2a9f94fece6d700542d70b9970f714ed4753ef Mon Sep 17 00:00:00 2001 From: Nathan Gough Date: Thu, 27 Jan 2022 12:54:41 -0500 Subject: [PATCH] NIFI-9632 - Removed nifi-lumberjack-bundle - Removed several unused socket classes from nifi-processor-utils This closes #5722 Signed-off-by: David Handermann --- .../nifi/processors/beats/ListenBeats.java | 2 +- .../dispatcher/SocketChannelDispatcher.java | 298 ----------------- .../listen/handler/ChannelHandlerFactory.java | 46 --- .../socket/SSLSocketChannelHandler.java | 153 --------- .../handler/socket/SocketChannelHandler.java | 51 --- .../socket/SocketChannelHandlerFactory.java | 55 ---- .../socket/StandardSocketChannelHandler.java | 158 --------- .../socket/SSLSocketChannelResponder.java | 44 --- .../socket/SocketChannelResponder.java | 69 ---- .../nifi-lumberjack-nar/pom.xml | 43 --- .../src/main/resources/META-INF/LICENSE | 233 -------------- .../src/main/resources/META-INF/NOTICE | 34 -- .../nifi-lumberjack-processors/pom.xml | 91 ------ .../lumberjack/ListenLumberjack.java | 237 -------------- .../lumberjack/event/LumberjackEvent.java | 47 --- .../event/LumberjackEventFactory.java | 39 --- .../lumberjack/event/LumberjackMetadata.java | 27 -- .../lumberjack/frame/LumberjackDecoder.java | 299 ------------------ .../lumberjack/frame/LumberjackEncoder.java | 48 --- .../lumberjack/frame/LumberjackFrame.java | 116 ------- .../frame/LumberjackFrameException.java | 33 -- .../lumberjack/frame/LumberjackState.java | 29 -- .../handler/LumberjackFrameHandler.java | 115 ------- .../LumberjackSSLSocketChannelHandler.java | 95 ------ .../LumberjackSocketChannelHandler.java | 104 ------ ...LumberjackSocketChannelHandlerFactory.java | 57 ---- .../response/LumberjackChannelResponse.java | 42 --- .../response/LumberjackResponse.java | 62 ---- .../org.apache.nifi.processor.Processor | 15 - .../event/TestLumberjackEventFactory.java | 55 ---- .../frame/TestLumberjackDecoder.java | 99 ------ .../frame/TestLumberjackEncoder.java | 47 --- .../lumberjack/frame/TestLumberjackFrame.java | 38 --- .../ITLumberjackSocketChannelHandler.java | 205 ------------ .../handler/TestLumberjackFrameHandler.java | 156 --------- .../nifi-lumberjack-bundle/pom.xml | 34 -- nifi-nar-bundles/pom.xml | 1 - 37 files changed, 1 insertion(+), 3276 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/pom.xml delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/LICENSE delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/NOTICE delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/pom.xml delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackChannelResponse.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java delete mode 100644 nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java index 889f11a2c4..86cc2df339 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java @@ -70,7 +70,7 @@ import java.util.concurrent.LinkedBlockingQueue; @Tags({"listen", "beats", "tcp", "logs"}) @CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " + "to the content of a FlowFile." + - "This processor replaces the now deprecated ListenLumberjack") + "This processor replaces the now deprecated/removed ListenLumberjack") @WritesAttributes({ @WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."), diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java deleted file mode 100644 index 41e1a70e20..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.dispatcher; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.security.util.ClientAuth; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.Channel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Accepts Socket connections on the given port and creates a handler for each connection to - * be executed by a thread pool. - */ -public class SocketChannelDispatcher> implements AsyncChannelDispatcher { - - private final EventFactory eventFactory; - private final ChannelHandlerFactory handlerFactory; - private final ByteBufferSource bufferSource; - private final BlockingQueue events; - private final ComponentLog logger; - private final int maxConnections; - private final int maxThreadPoolSize; - private final SSLContext sslContext; - private final ClientAuth clientAuth; - private final Charset charset; - - private ThreadPoolExecutor executor; - private volatile boolean stopped = false; - private Selector selector; - private final BlockingQueue keyQueue; - private final AtomicInteger currentConnections = new AtomicInteger(0); - - public SocketChannelDispatcher(final EventFactory eventFactory, - final ChannelHandlerFactory handlerFactory, - final ByteBufferSource bufferSource, - final BlockingQueue events, - final ComponentLog logger, - final int maxConnections, - final SSLContext sslContext, - final Charset charset) { - this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); - } - - public SocketChannelDispatcher(final EventFactory eventFactory, - final ChannelHandlerFactory handlerFactory, - final ByteBufferSource bufferSource, - final BlockingQueue events, - final ComponentLog logger, - final int maxConnections, - final SSLContext sslContext, - final ClientAuth clientAuth, - final Charset charset) { - this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset); - } - - public SocketChannelDispatcher(final EventFactory eventFactory, - final ChannelHandlerFactory handlerFactory, - final ByteBufferSource bufferSource, - final BlockingQueue events, - final ComponentLog logger, - final int maxConnections, - final int maxThreadPoolSize, - final SSLContext sslContext, - final ClientAuth clientAuth, - final Charset charset) { - this.eventFactory = eventFactory; - this.handlerFactory = handlerFactory; - this.bufferSource = bufferSource; - this.events = events; - this.logger = logger; - this.maxConnections = maxConnections; - this.maxThreadPoolSize = maxThreadPoolSize; - this.keyQueue = new LinkedBlockingQueue<>(maxConnections); - this.sslContext = sslContext; - this.clientAuth = clientAuth; - this.charset = charset; - } - - @Override - public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { - final InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port); - - stopped = false; - executor = new ThreadPoolExecutor( - maxThreadPoolSize, - maxThreadPoolSize, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-worker-%d").build()); - executor.allowCoreThreadTimeOut(true); - - final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.configureBlocking(false); - if (maxBufferSize > 0) { - serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); - final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < maxBufferSize) { - logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " - + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " - + "maximum receive buffer"); - } - } - - serverSocketChannel.socket().bind(inetSocketAddress); - - selector = Selector.open(); - serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); - } - - @Override - public void run() { - while (!stopped) { - try { - int selected = selector.select(); - // if stopped the selector could already be closed which would result in a ClosedSelectorException - if (selected > 0 && !stopped){ - Iterator selectorKeys = selector.selectedKeys().iterator(); - // if stopped we don't want to modify the keys because close() may still be in progress - while (selectorKeys.hasNext() && !stopped) { - SelectionKey key = selectorKeys.next(); - selectorKeys.remove(); - if (!key.isValid()){ - continue; - } - if (key.isAcceptable()) { - // Handle new connections coming in - final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); - final SocketChannel socketChannel = channel.accept(); - socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - // Check for available connections - if (currentConnections.incrementAndGet() > maxConnections){ - currentConnections.decrementAndGet(); - logger.warn("Rejecting connection from {} because max connections has been met", - new Object[]{ socketChannel.getRemoteAddress().toString() }); - IOUtils.closeQuietly(socketChannel); - continue; - } - logger.debug("Accepted incoming connection from {}", - new Object[]{socketChannel.getRemoteAddress().toString()}); - // Set socket to non-blocking, and register with selector - socketChannel.configureBlocking(false); - SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); - - // Prepare the byte buffer for the reads, clear it out - ByteBuffer buffer = bufferSource.acquire(); - - // If we have an SSLContext then create an SSLEngine for the channel - SSLSocketChannel sslSocketChannel = null; - if (sslContext != null) { - final SSLEngine sslEngine = sslContext.createSSLEngine(); - sslEngine.setUseClientMode(false); - - switch (clientAuth) { - case REQUIRED: - sslEngine.setNeedClientAuth(true); - break; - case WANT: - sslEngine.setWantClientAuth(true); - break; - case NONE: - sslEngine.setNeedClientAuth(false); - sslEngine.setWantClientAuth(false); - break; - } - - sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel); - } - - // Attach the buffer and SSLSocketChannel to the key - SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslSocketChannel); - readKey.attach(attachment); - } else if (key.isReadable()) { - // Clear out the operations the select is interested in until done reading - key.interestOps(0); - // Create a handler based on the protocol and whether an SSLEngine was provided or not - final Runnable handler; - if (sslContext != null) { - handler = handlerFactory.createSSLHandler(key, this, charset, eventFactory, events, logger); - } else { - handler = handlerFactory.createHandler(key, this, charset, eventFactory, events, logger); - } - - // run the handler - executor.execute(handler); - } - } - } - // Add back all idle sockets to the select - SelectionKey key; - while((key = keyQueue.poll()) != null){ - key.interestOps(SelectionKey.OP_READ); - } - } catch (IOException e) { - logger.error("Error accepting connection from SocketChannel", e); - } - } - } - - @Override - public int getPort() { - // Return the port for the key listening for accepts - for(SelectionKey key : selector.keys()){ - if (key.isValid()) { - final Channel channel = key.channel(); - if (channel instanceof ServerSocketChannel) { - return ((ServerSocketChannel)channel).socket().getLocalPort(); - } - } - } - return 0; - } - - @Override - public void close() { - stopped = true; - if (selector != null) { - selector.wakeup(); - } - - if (executor != null) { - executor.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - if (selector != null) { - synchronized (selector.keys()) { - for (SelectionKey key : selector.keys()) { - IOUtils.closeQuietly(key.channel()); - } - } - } - IOUtils.closeQuietly(selector); - } - - @Override - public void completeConnection(SelectionKey key) { - // connection is done. Releasing buffer - final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - bufferSource.release(attachment.getByteBuffer()); - currentConnections.decrementAndGet(); - } - - @Override - public void addBackForSelection(SelectionKey key) { - keyQueue.offer(key); - selector.wakeup(); - } - -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java deleted file mode 100644 index 9ca6bdd84f..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; - -import java.nio.channels.SelectionKey; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Factory that can produce ChannelHandlers for the given type of Event and ChannelDispatcher. - */ -public interface ChannelHandlerFactory { - - ChannelHandler createHandler(final SelectionKey key, - final D dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger); - - ChannelHandler createSSLHandler(final SelectionKey key, - final D dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger); -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java deleted file mode 100644 index ef747e12bd..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -/** - * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS. - */ -public class SSLSocketChannelHandler> extends SocketChannelHandler { - - private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - - public SSLSocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public void run() { - boolean eof = false; - SSLSocketChannel sslSocketChannel = null; - try { - int bytesRead; - final SocketChannel socketChannel = (SocketChannel) key.channel(); - final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - - // get the SSLSocketChannel from the attachment - sslSocketChannel = attachment.getSslSocketChannel(); - - // SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[] - final ByteBuffer socketBuffer = attachment.getByteBuffer(); - byte[] socketBufferArray = new byte[socketBuffer.limit()]; - - // read until no more data - try { - while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) { - processBuffer(sslSocketChannel, socketChannel, bytesRead, socketBufferArray); - logger.debug("bytes read from sslSocketChannel {}", new Object[]{bytesRead}); - } - } catch (SocketTimeoutException ste) { - // SSLSocketChannel will throw this exception when 0 bytes are read and the timeout threshold - // is exceeded, we don't want to close the connection in this case - bytesRead = 0; - } - - // Check for closed socket - if( bytesRead < 0 ){ - eof = true; - logger.debug("Reached EOF, closing connection"); - } else { - logger.debug("No more data available, returning for selection"); - } - } catch (ClosedByInterruptException | InterruptedException e) { - logger.debug("read loop interrupted, closing connection"); - // Treat same as closed socket - eof = true; - } catch (ClosedChannelException e) { - // ClosedChannelException doesn't have a message so handle it separately from IOException - logger.error("Error reading from channel due to channel being closed", e); - // Treat same as closed socket - eof = true; - } catch (IOException e) { - logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); - // Treat same as closed socket - eof = true; - } finally { - if(eof == true) { - IOUtils.closeQuietly(sslSocketChannel); - dispatcher.completeConnection(key); - } else { - dispatcher.addBackForSelection(key); - } - } - } - - /** - * Process the contents of the buffer. Give sub-classes a chance to override this behavior. - * - * @param sslSocketChannel the channel the data was read from - * @param socketChannel the socket channel being wrapped by sslSocketChannel - * @param bytesRead the number of bytes read - * @param buffer the buffer to process - * @throws InterruptedException thrown if interrupted while queuing events - */ - protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel, - final int bytesRead, final byte[] buffer) throws InterruptedException, IOException { - final InetAddress sender = socketChannel.socket().getInetAddress(); - - // go through the buffer looking for the end of each message - for (int i = 0; i < bytesRead; i++) { - final byte currByte = buffer[i]; - - // check if at end of a message - if (currByte == getDelimiter()) { - if (currBytes.size() > 0) { - final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); - final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.offer(event); - currBytes.reset(); - } - } else { - currBytes.write(currByte); - } - } - } - - @Override - public byte getDelimiter() { - return TCP_DELIMITER; - } - -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java deleted file mode 100644 index 07b5dcce6b..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandler; - -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Base class for socket channel handlers. - */ -public abstract class SocketChannelHandler> extends ChannelHandler { - - static final byte TCP_DELIMITER = '\n'; - - public SocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - } - - /** - * @return the byte used as the delimiter between messages for the given handler - */ - public abstract byte getDelimiter(); - -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java deleted file mode 100644 index 9003f905fe..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandler; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; - -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Default factory for creating socket channel handlers. - */ -public class SocketChannelHandlerFactory> implements ChannelHandlerFactory { - - @Override - public ChannelHandler createHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - return new StandardSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public ChannelHandler createSSLHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - return new SSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); - } -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java deleted file mode 100644 index 250168c40d..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -/** - * Reads from the given SocketChannel into the provided buffer. If the given delimiter is found, the data - * read up to that point is queued for processing. - */ -public class StandardSocketChannelHandler> extends SocketChannelHandler { - - private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - - public StandardSocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public void run() { - boolean eof = false; - SocketChannel socketChannel = null; - - try { - int bytesRead; - socketChannel = (SocketChannel) key.channel(); - - final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - final ByteBuffer socketBuffer = attachment.getByteBuffer(); - - // read until the buffer is full - while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { - // prepare byte buffer for reading - socketBuffer.flip(); - // mark the current position as start, in case of partial message read - socketBuffer.mark(); - // process the contents that have been read into the buffer - processBuffer(socketChannel, socketBuffer); - - // Preserve bytes in buffer for next call to run - // NOTE: This code could benefit from the two ByteBuffer read calls to avoid - // this compact for higher throughput - socketBuffer.reset(); - socketBuffer.compact(); - logger.debug("bytes read {}", new Object[]{bytesRead}); - } - - // Check for closed socket - if( bytesRead < 0 ){ - eof = true; - logger.debug("Reached EOF, closing connection"); - } else { - logger.debug("No more data available, returning for selection"); - } - } catch (ClosedByInterruptException | InterruptedException e) { - logger.debug("read loop interrupted, closing connection"); - // Treat same as closed socket - eof = true; - } catch (ClosedChannelException e) { - // ClosedChannelException doesn't have a message so handle it separately from IOException - logger.error("Error reading from channel due to channel being closed", e); - // Treat same as closed socket - eof = true; - } catch (IOException e) { - logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); - // Treat same as closed socket - eof = true; - } finally { - if(eof == true) { - IOUtils.closeQuietly(socketChannel); - dispatcher.completeConnection(key); - } else { - dispatcher.addBackForSelection(key); - } - } - } - - /** - * Process the contents that have been read into the buffer. Allow sub-classes to override this behavior. - * - * @param socketChannel the channel the data was read from - * @param socketBuffer the buffer the data was read into - * @throws InterruptedException if interrupted when queuing events - */ - protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) throws InterruptedException, IOException { - // get total bytes in buffer - final int total = socketBuffer.remaining(); - final InetAddress sender = socketChannel.socket().getInetAddress(); - - // go through the buffer looking for the end of each message - currBytes.reset(); - for (int i = 0; i < total; i++) { - // NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved - // Pull data out of buffer and cram into byte array - byte currByte = socketBuffer.get(); - - // check if at end of a message - if (currByte == getDelimiter()) { - if (currBytes.size() > 0) { - final SocketChannelResponder response = new SocketChannelResponder(socketChannel); - final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.offer(event); - currBytes.reset(); - - // Mark this as the start of the next message - socketBuffer.mark(); - } - } else { - currBytes.write(currByte); - } - } - } - - @Override - public byte getDelimiter() { - return TCP_DELIMITER; - } - -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java deleted file mode 100644 index 20102ba2bc..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.response.socket; - -import org.apache.nifi.processor.util.listen.response.ChannelResponse; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -import java.io.IOException; -import java.nio.channels.SocketChannel; - -/** - * A ChannelResponder for SSLSocketChannels. - */ -public class SSLSocketChannelResponder extends SocketChannelResponder { - - private SSLSocketChannel sslSocketChannel; - - public SSLSocketChannelResponder(final SocketChannel socketChannel, final SSLSocketChannel sslSocketChannel) { - super(socketChannel); - this.sslSocketChannel = sslSocketChannel; - } - - @Override - public void respond() throws IOException { - for (final ChannelResponse response : responses) { - sslSocketChannel.write(response.toByteArray()); - } - } - -} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java deleted file mode 100644 index 5c20bf0be0..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.response.socket; - -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processor.util.listen.response.ChannelResponse; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * A ChannelResponder for SocketChannels. The SocketChannel should first be registered with a selector, - * upon being selected for writing the respond() method should be executed. - */ -public class SocketChannelResponder implements ChannelResponder { - - protected final List responses; - protected final SocketChannel socketChannel; - - public SocketChannelResponder(final SocketChannel socketChannel) { - this.responses = new ArrayList<>(); - this.socketChannel = socketChannel; - } - - @Override - public SocketChannel getChannel() { - return socketChannel; - } - - @Override - public List getResponses() { - return Collections.unmodifiableList(responses); - } - - @Override - public void addResponse(ChannelResponse response) { - this.responses.add(response); - } - - @Override - public void respond() throws IOException { - for (final ChannelResponse response : responses) { - final ByteBuffer responseBuffer = ByteBuffer.wrap(response.toByteArray()); - - while (responseBuffer.hasRemaining()) { - socketChannel.write(responseBuffer); - } - } - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/pom.xml b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/pom.xml deleted file mode 100644 index 6b48ccf8ab..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-lumberjack-bundle - 1.16.0-SNAPSHOT - - - nifi-lumberjack-nar - 1.16.0-SNAPSHOT - nar - - - - org.apache.nifi - nifi-lumberjack-processors - 1.16.0-SNAPSHOT - - - org.apache.nifi - nifi-standard-services-api-nar - 1.16.0-SNAPSHOT - nar - - - - diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/LICENSE deleted file mode 100644 index e3d56a9e06..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/LICENSE +++ /dev/null @@ -1,233 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -APACHE NIFI SUBCOMPONENTS: - -The Apache NiFi project contains subcomponents with separate copyright -notices and license terms. Your use of the source code for the these -subcomponents is subject to the terms and conditions of the following -licenses. - - The binary distribution of this product bundles 'Bouncy Castle JDK 1.5' - under an MIT style license. - - Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. - diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/NOTICE deleted file mode 100644 index 8fadaae80d..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-nar/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,34 +0,0 @@ -nifi-lumberjack-nar -Copyright 2014-2022 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -****************** -Apache Software License v2 -****************** - -The following binary components are provided under the Apache Software License v2 - - (ASLv2) Jackson JSON processor - The following NOTICE information applies: - # Jackson JSON processor - - Jackson is a high-performance, Free/Open Source JSON processing library. - It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has - been in development since 2007. - It is currently developed by a community of developers, as well as supported - commercially by FasterXML.com. - - ## Licensing - - Jackson core and extension components may licensed under different licenses. - To find the details that apply to this artifact see the accompanying LICENSE file. - For more information, including possible other licensing options, contact - FasterXML.com (http://fasterxml.com). - - ## Credits - - A list of contributors may be found from CREDITS file, which is included - in some artifacts (usually source distributions); but is always available - from the source code management (SCM) system project uses. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/pom.xml b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/pom.xml deleted file mode 100644 index 58fc864d24..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/pom.xml +++ /dev/null @@ -1,91 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-lumberjack-bundle - 1.16.0-SNAPSHOT - - - nifi-lumberjack-processors - jar - - - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-processor-utils - 1.16.0-SNAPSHOT - - - org.apache.nifi - nifi-mock - 1.16.0-SNAPSHOT - test - - - org.apache.nifi - nifi-socket-utils - 1.16.0-SNAPSHOT - - - org.apache.nifi - nifi-utils - 1.16.0-SNAPSHOT - - - org.apache.nifi - nifi-flowfile-packager - 1.16.0-SNAPSHOT - - - org.apache.nifi - nifi-ssl-context-service-api - provided - - - com.google.code.gson - gson - 2.2.4 - - - - - - - jigsaw - - (1.8,) - - - - jakarta.xml.bind - jakarta.xml.bind-api - - - org.glassfish.jaxb - jaxb-runtime - - - - - diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java deleted file mode 100644 index 3b5b923566..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack; - -import com.google.gson.Gson; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; -import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processor.util.listen.response.ChannelResponse; -import org.apache.nifi.processors.lumberjack.event.LumberjackEvent; -import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory; -import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder; -import org.apache.nifi.processors.lumberjack.handler.LumberjackSocketChannelHandlerFactory; -import org.apache.nifi.processors.lumberjack.response.LumberjackChannelResponse; -import org.apache.nifi.processors.lumberjack.response.LumberjackResponse; -import org.apache.nifi.ssl.RestrictedSSLContextService; -import org.apache.nifi.ssl.SSLContextService; - -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -@Deprecated -@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@Tags({"listen", "lumberjack", "tcp", "logs"}) -@CapabilityDescription("This processor is deprecated and may be removed in the near future. Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " + - "acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " + - "portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " + - "output of this processor can be sent to a ParseSyslog processor for further processing. ") -@WritesAttributes({ - @WritesAttribute(attribute = "lumberjack.sender", description = "The sending host of the messages."), - @WritesAttribute(attribute = "lumberjack.port", description = "The sending port the messages were received over."), - @WritesAttribute(attribute = "lumberjack.sequencenumber", description = "The sequence number of the message. Only included if is 1."), - @WritesAttribute(attribute = "lumberjack.*", description = "The keys and respective values as sent by the lumberjack producer. Only included if is 1."), - @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain") -}) -@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"}) -/** - * @deprecated As of release 1.2.0, replaced by {@link org.apache.nifi.processors.beats.ListenBeats} - * */ -public class ListenLumberjack extends AbstractListenEventBatchingProcessor { - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + - "messages will be received over a secure connection. Note that as Lumberjack client requires" + - "two-way SSL authentication, the controller MUST have a truststore and a keystore to work" + - "properly.") - .required(true) - .identifiesControllerService(RestrictedSSLContextService.class) - .build(); - - @Override - protected List getAdditionalProperties() { - return Arrays.asList( - MAX_CONNECTIONS, - SSL_CONTEXT_SERVICE - ); - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - if (sslContextService != null && sslContextService.isTrustStoreConfigured() == false) { - results.add(new ValidationResult.Builder() - .explanation("The context service must have a truststore configured for the lumberjack forwarder client to work correctly") - .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build()); - } - - return results; - } - - private volatile LumberjackEncoder lumberjackEncoder; - - - @Override - @OnScheduled - public void onScheduled(ProcessContext context) throws IOException { - super.onScheduled(context); - // wanted to ensure charset was already populated here - lumberjackEncoder = new LumberjackEncoder(); - } - - @Override - protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue events) throws IOException { - final EventFactory eventFactory = new LumberjackEventFactory(); - final ChannelHandlerFactory handlerFactory = new LumberjackSocketChannelHandlerFactory<>(); - - final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); - final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); - final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); - - // initialize the buffer pool based on max number of connections and the buffer size - final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize); - - // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher - SSLContext sslContext = null; - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - sslContext = sslContextService.createContext(); - } - - // if we decide to support SSL then get the context and pass it in here - return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events, - getLogger(), maxConnections, sslContext, charSet); - } - - - @Override - protected String getBatchKey(LumberjackEvent event) { - return event.getSender(); - } - - protected void respond(final LumberjackEvent event, final LumberjackResponse lumberjackResponse) { - final ChannelResponse response = new LumberjackChannelResponse(lumberjackEncoder, lumberjackResponse); - - final ChannelResponder responder = event.getResponder(); - responder.addResponse(response); - try { - responder.respond(); - } catch (IOException e) { - getLogger().error("Error sending response for transaction {} due to {}", - new Object[]{event.getSeqNumber(), e.getMessage()}, e); - } - } - - protected void postProcess(final ProcessContext context, final ProcessSession session, final List events) { - // first commit the session so we guarantee we have all the events successfully - // written to FlowFiles and transferred to the success relationship - session.commitAsync(() -> { - // respond to each event to acknowledge successful receipt - for (final LumberjackEvent event : events) { - respond(event, LumberjackResponse.ok(event.getSeqNumber())); - } - }); - } - - @Override - protected String getTransitUri(FlowFileEventBatch batch) { - final String sender = batch.getEvents().get(0).getSender(); - final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; - final String transitUri = new StringBuilder().append("lumberjack").append("://").append(senderHost).append(":") - .append(port).toString(); - return transitUri; - } - - @Override - protected Map getAttributes(FlowFileEventBatch batch) { - final List events = batch.getEvents(); - // the sender and command will be the same for all events based on the batch key - final String sender = events.get(0).getSender(); - final int numAttributes = events.size() == 1 ? 5 : 4; - final Map attributes = new HashMap<>(numAttributes); - attributes.put(LumberjackAttributes.SENDER.key(), sender); - attributes.put(LumberjackAttributes.PORT.key(), String.valueOf(port)); - attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - // if there was only one event then we can pass on the transaction - // NOTE: we could pass on all the transaction ids joined together - if (events.size() == 1) { - attributes.put(LumberjackAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber())); - - // Convert the serialized fields from JSON - String serialFields = String.valueOf(events.get(0).getFields()); - Gson jsonObject = new Gson(); - - Map fields = jsonObject.fromJson(serialFields, Map.class); - - for (Map.Entry entry : fields.entrySet()) { - attributes.put(LumberjackAttributes.FIELDS.key().concat(".").concat(entry.getKey()), entry.getValue()); - } - } - return attributes; - } - - public enum LumberjackAttributes implements FlowFileAttributeKey { - SENDER("lumberjack.sender"), - PORT("lumberjack.port"), - SEQNUMBER("lumberjack.sequencenumber"), - FIELDS("lumberjack.fields"); - - private final String key; - - LumberjackAttributes(String key) { - this.key = key; - } - - @Override - public String key() { - return key; - } - } -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java deleted file mode 100644 index c3ddff680d..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.event; - -import org.apache.nifi.processor.util.listen.event.StandardEvent; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; - -import java.nio.channels.SocketChannel; - -/** - * A Lumberjack event which adds the transaction number and command to the StandardEvent. - */ -@Deprecated -public class LumberjackEvent extends StandardEvent { - - private final long seqNumber; - private final String fields; - - public LumberjackEvent(final String sender, final byte[] data, final ChannelResponder responder, final long seqNumber, String fields) { - super(sender, data, responder); - this.seqNumber = seqNumber; - this.fields = fields; - } - - public long getSeqNumber() { - return seqNumber; - } - - public String getFields() { - return fields; - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java deleted file mode 100644 index 3a08c2a0aa..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.event; - -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; - -import java.util.Map; - -/** - * An EventFactory implementation to create LumberjackEvents. - */ -@Deprecated -public class LumberjackEventFactory implements EventFactory { - - @Override - public LumberjackEvent create(final byte[] data, final Map metadata, final ChannelResponder responder) { - final String sender = metadata.get(EventFactory.SENDER_KEY); - final long seqNumber = Long.valueOf(metadata.get(LumberjackMetadata.SEQNUMBER_KEY)); - final String fields = metadata.get(LumberjackMetadata.FIELDS_KEY); - - return new LumberjackEvent(sender, data, responder, seqNumber, fields); - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java deleted file mode 100644 index a2fa4526db..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.event; - -/** - * Metadata keys for Lumberjack. - */ -@Deprecated -public interface LumberjackMetadata { - - String SEQNUMBER_KEY = "lumberjack.sequencenumber"; - String FIELDS_KEY = "lumberjack.fields"; -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java deleted file mode 100644 index 7fb007b6fd..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.LinkedList; -import java.util.List; -import java.util.zip.InflaterInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Decodes a Lumberjack frame by maintaining a state based on each byte that has been processed. This class - * should not be shared by multiple threads. - */ -@Deprecated -public class LumberjackDecoder { - - static final Logger logger = LoggerFactory.getLogger(LumberjackDecoder.class); - - private LumberjackFrame.Builder frameBuilder; - private LumberjackState currState = LumberjackState.VERSION; - private byte decodedFrameType; - - private byte[] decompressedData; - - private final Charset charset; - private final ByteArrayOutputStream currBytes; - - private long windowSize; - - public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a; - - /** - * @param charset the charset to decode bytes from the Lumberjack frame - */ - public LumberjackDecoder(final Charset charset) { - this(charset, new ByteArrayOutputStream(4096)); - } - - /** - * @param charset the charset to decode bytes from the Lumberjack frame - * @param buffer a buffer to use while processing the bytes - */ - public LumberjackDecoder(final Charset charset, final ByteArrayOutputStream buffer) { - this.charset = charset; - this.currBytes = buffer; - this.frameBuilder = new LumberjackFrame.Builder(); - this.decodedFrameType = 0x00; - } - - /** - * Resets this decoder back to its initial state. - */ - public void reset() { - frameBuilder = new LumberjackFrame.Builder(); - currState = LumberjackState.VERSION; - decodedFrameType = 0x00; - currBytes.reset(); - } - - /** - * Process the next byte from the channel, updating the builder and state accordingly. - * - * @param currByte the next byte to process - * @preturn true if a frame is ready to be retrieved, false otherwise - */ - public boolean process(final byte currByte) throws LumberjackFrameException { - try { - switch (currState) { - case VERSION: - processVERSION(currByte); - break; - case FRAMETYPE: - processFRAMETYPE(currByte); - break; - case PAYLOAD: - processPAYLOAD(currByte); - if (frameBuilder.frameType == FRAME_WINDOWSIZE && currState == LumberjackState.COMPLETE) { - return true; - } else if (frameBuilder.frameType == FRAME_COMPRESSED && currState == LumberjackState.COMPLETE) { - return true; - } else { - break; - } - case COMPLETE: - return true; - default: - break; - } - return false; - } catch (Exception e) { - throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e); - } - } - - - /** - * Returns the decoded frame and resets the decoder for the next frame. - * This method should be called after checking isComplete(). - * - * @return the LumberjackFrame that was decoded - */ - public List getFrames() throws LumberjackFrameException { - List frames = new LinkedList<>(); - - if (currState != LumberjackState.COMPLETE) { - throw new LumberjackFrameException("Must be at the trailer of a frame"); - } - try { - if (currState == LumberjackState.COMPLETE && frameBuilder.frameType == FRAME_COMPRESSED) { - logger.debug("Frame is compressed, will iterate to decode", new Object[]{}); - // LumberjackDecoder decompressedDecoder = new LumberjackDecoder(); - - // Zero currBytes, currState and frameBuilder prior to iteration over - // decompressed bytes - currBytes.reset(); - frameBuilder.reset(); - currState = LumberjackState.VERSION; - - // Run over decompressed data. - frames = processDECOMPRESSED(decompressedData); - - } else { - final LumberjackFrame frame = frameBuilder.build(); - currBytes.reset(); - frameBuilder.reset(); - currState = LumberjackState.VERSION; - frames.add(frame); - } - return frames; - - } catch (Exception e) { - throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e); - } - } - - private List processDECOMPRESSED(byte[] decompressedData) { - List frames = new LinkedList<>(); - LumberjackFrame.Builder internalFrameBuilder = new LumberjackFrame.Builder(); - ByteBuffer currentData = ByteBuffer.wrap(decompressedData); - - // Lumberjack has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames. - // inside a compressed input. - // Or as stated in the documentation: - // - // "As an example, you could have 3 data frames compressed into a single - // 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}" - // - // Therefore, instead of calling process method again, just iterate over each of - // the frames and split them so they can be processed by LumberjackFrameHandler - - while (currentData.hasRemaining()) { - - int payloadLength = 0; - - internalFrameBuilder.version = currentData.get(); - internalFrameBuilder.frameType = currentData.get(); - internalFrameBuilder.seqNumber = currentData.getInt() & 0x00000000ffffffffL; - currentData.mark(); - - // Set the payloadLength to negative to avoid doing math - // around valueLength and valueLength - payloadLength = payloadLength - currentData.position(); - - long pairCount = currentData.getInt() & 0x00000000ffffffffL; - for (int i = 0; i < pairCount; i++) { - long keyLength = currentData.getInt() & 0x00000000ffffffffL; - currentData.position(currentData.position() + (int) keyLength); - long valueLength = currentData.getInt() & 0x00000000ffffffffL; - currentData.position(currentData.position() + (int) valueLength); - } - // Infer the length of the payload from position... - payloadLength = payloadLength + currentData.position(); - - // Reset to mark (i.e. skip frame headers) prior to getting the data - currentData.reset(); - - // get the data, shift mark and compact so next iteration can - // read rest of buffer. - byte[] bytes = new byte[payloadLength]; - currentData.get(bytes, 0, payloadLength); - currentData.mark(); - - // Add payload to frame - internalFrameBuilder.payload(bytes); - - // data frame is created - LumberjackFrame frame = internalFrameBuilder.build(); - frames.add(frame); - internalFrameBuilder.reset(); - } - - return frames; - } - - - private void processVERSION(final byte b) { - byte version = b; - frameBuilder.version(version); - logger.debug("Version number is {}", new Object[]{version}); - currBytes.write(b); - currState = LumberjackState.FRAMETYPE; - } - - private void processFRAMETYPE(final byte b) { - decodedFrameType = b; - frameBuilder.frameType(decodedFrameType); - logger.debug("Frame type is {}", new Object[]{decodedFrameType}); - currBytes.write(b); - currState = LumberjackState.PAYLOAD; - } - - private void processPAYLOAD(final byte b) { - currBytes.write(b); - switch (decodedFrameType) { - case FRAME_WINDOWSIZE: //'W' - if (currBytes.size() < 6) { - logger.trace("Lumberjack currBytes contents are {}", currBytes.toString()); - break; - } else if (currBytes.size() == 6) { - frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL; - logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize}); - // Sets payload to empty as frame contains no data - frameBuilder.payload(new byte[]{}); - currBytes.reset(); - currState = LumberjackState.COMPLETE; - windowSize = frameBuilder.dataSize; - break; - } else { - break; - } - case FRAME_COMPRESSED: //'C' - if (currBytes.size() < 6) { - logger.trace("Lumberjack currBytes contents are {}", currBytes.toString()); - break; - } else if (currBytes.size() >= 6) { - frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL; - if (currBytes.size() - 6 == frameBuilder.dataSize) { - try { - byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size()); - InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf)); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - int len; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - in.close(); - out.close(); - decompressedData = out.toByteArray(); - // buf is no longer needed - buf = null; - logger.debug("Finished decompressing data"); - // Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them - // as type 'D' frames - frameBuilder.dataSize(decompressedData.length); - currState = LumberjackState.COMPLETE; - - } catch (IOException e) { - throw new LumberjackFrameException("Error decompressing frame: " + e.getMessage(), e); - } - - } - break; - - // If currentByte.size is not lower than six and also not equal or great than 6... - } else { - break; - } - } - } - - private void processCOMPLETE() { - currBytes.reset(); - frameBuilder.reset(); - currState = LumberjackState.VERSION; - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java deleted file mode 100644 index cb8a6831c7..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * Encodes a LumberjackFrame into raw bytes using the given charset. - */ -@Deprecated -public class LumberjackEncoder { - - - public byte[] encode(final LumberjackFrame frame) { - final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - - // Writes the version - buffer.write(frame.getVersion()); - - // Writes the frameType - buffer.write(frame.getFrameType()); - - // Writes the sequence number - try { - buffer.write(frame.getPayload()); - } catch (IOException e) { - throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e); - } - - return buffer.toByteArray(); - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java deleted file mode 100644 index 809c679c77..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - - -/** - * A Lumberjack frame received from a channel. - */ -@Deprecated -public class LumberjackFrame { - - public static final byte DELIMITER = 10; - - private final byte version; - private final byte frameType; - private final byte[] payload; - private final long dataSize; - private final long seqNumber; - - private LumberjackFrame(final Builder builder) { - this.version = builder.version; - this.frameType = builder.frameType; - this.payload = builder.payload; - this.dataSize = builder.dataSize; - this.seqNumber = builder.seqNumber; - - if (version < 2 || payload.length < 0 ) { - throw new LumberjackFrameException("Invalid Frame"); - } - } - - public long getSeqNumber() { - return seqNumber; - } - - public byte getVersion() { - return version; - } - - public byte getFrameType() { - return frameType; - } - - public byte [] getPayload() { - return payload; - } - - /** - * Builder for a LumberjackFrame. - */ - public static class Builder { - - byte version; - byte frameType; - byte [] payload; - long dataSize; - long seqNumber; - - public Builder() { - reset(); - } - - public void reset() { - version = -1; - seqNumber = -1; - frameType = -1; - payload = null; - } - - public Builder version(final byte version) { - this.version = version; - return this; - } - - public Builder seqNumber(final long seqNumber) { - this.seqNumber = seqNumber; - return this; - } - - public Builder frameType(final byte frameType) { - this.frameType = frameType; - return this; - } - - public Builder dataSize(final long dataSize) { - this.dataSize = dataSize; - return this; - } - - public Builder payload(final byte [] payload) { - this.payload = payload; - return this; - } - - - public LumberjackFrame build() { - return new LumberjackFrame(this); - } - - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java deleted file mode 100644 index a388a5bf99..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -/** - * Represents an error encountered when decoding Lumberjack frames. - */ -@Deprecated -public class LumberjackFrameException extends RuntimeException { - - public LumberjackFrameException(String message) { - super(message); - } - - public LumberjackFrameException(String message, Throwable cause) { - super(message, cause); - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java deleted file mode 100644 index 3e3e70ca3f..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -/** - * The parts of a Lumberjack frame. - */ -@Deprecated -public enum LumberjackState { - - VERSION, - FRAMETYPE, - PAYLOAD, - COMPLETE -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java deleted file mode 100644 index a70c4f3088..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processor.util.listen.event.EventQueue; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata; -import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; - -import com.google.gson.Gson; - -/** - * Encapsulates the logic to handle a LumberjackFrame once it has been read from the channel. - */ -@Deprecated -public class LumberjackFrameHandler> { - - private final Charset charset; - private final EventFactory eventFactory; - private final EventQueue events; - private final SelectionKey key; - private final AsyncChannelDispatcher dispatcher; - private final LumberjackEncoder encoder; - private final ComponentLog logger; - - public LumberjackFrameHandler(final SelectionKey selectionKey, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final AsyncChannelDispatcher dispatcher, - final ComponentLog logger) { - this.key = selectionKey; - this.charset = charset; - this.eventFactory = eventFactory; - this.dispatcher = dispatcher; - this.logger = logger; - this.events = new EventQueue<>(events, logger); - this.encoder = new LumberjackEncoder(); - } - - public void handle(final LumberjackFrame frame, final ChannelResponder responder, final String sender) - throws IOException, InterruptedException { - - final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber())); - String line = ""; - - /* If frameType is a data Frame, Handle the Lumberjack data payload, iterating over it and extracting - keys and values into metadata. - - All keys are inserted into metadata with the exception of line that gets added into the body of the event - */ - if (frame.getFrameType() == 0x44) { - ByteBuffer currentData = ByteBuffer.wrap(frame.getPayload()); - long pairCount = currentData.getInt() & 0x00000000ffffffffL; - Map fields = new HashMap<>(); - for (int i = 0; i < pairCount; i++) { - long keyLength = currentData.getInt() & 0x00000000ffffffffL; - byte[] bytes = new byte[(int) keyLength]; - currentData.get(bytes); - String key = new String(bytes); - long valueLength = currentData.getInt() & 0x00000000ffffffffL; - bytes = new byte[(int) valueLength]; - currentData.get(bytes); - String value = new String(bytes); - - if (key.equals("line")) { - line = value; - } else { - fields.put(key, value); - } - } - // Serialize the fields into a String to push it via metdate - Gson serialFields = new Gson(); - - metadata.put("lumberjack.fields", serialFields.toJson(fields).toString()); - - // queue the raw event blocking until space is available, reset the buffer - final E event = eventFactory.create(line.getBytes(), metadata, responder); - events.offer(event); - } else if (frame.getFrameType() == 0x4A ) { - logger.error("Data type was JSON. JSON payload aren't yet supported, pending the documentation of Lumberjack protocol v2"); - } - } - } diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java deleted file mode 100644 index cdfe8d9efb..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - -import java.io.IOException; -import java.net.InetAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.List; -import java.util.concurrent.BlockingQueue; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler; -import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; -import org.apache.nifi.processors.lumberjack.frame.LumberjackDecoder; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -/** - * A Lumberjack implementation of SSLSocketChannelHandler. - */ -@Deprecated -public class LumberjackSSLSocketChannelHandler> extends SSLSocketChannelHandler { - - private LumberjackDecoder decoder; - private LumberjackFrameHandler frameHandler; - - public LumberjackSSLSocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - this.decoder = new LumberjackDecoder(charset); - this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); - } - - @Override - protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel, - final int bytesRead, final byte[] buffer) throws InterruptedException, IOException { - - final InetAddress sender = socketChannel.socket().getInetAddress(); - try { - - // go through the buffer parsing the Lumberjack command - for (int i = 0; i < bytesRead; i++) { - byte currByte = buffer[i]; - - // if we found the end of a frame, handle the frame and mark the buffer - if (decoder.process(currByte)) { - final List frames = decoder.getFrames(); - // A list of events has been generated - for (LumberjackFrame frame : frames) { - logger.debug("Received Lumberjack frame with transaction {} and command {}", - new Object[]{frame.getSeqNumber(), frame.getSeqNumber()}); - // Ignore the WINDOWS type frames as they contain no payload. - if (frame.getFrameType() != 0x57 ) { - final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); - frameHandler.handle(frame, responder, sender.toString()); - } - } - } - } - - logger.debug("Done processing buffer"); - - } catch (final LumberjackFrameException rfe) { - logger.error("Error reading Lumberjack frames due to {}", new Object[] {rfe.getMessage()} , rfe); - // if an invalid frame or bad data was sent then the decoder will be left in a - // corrupted state, so lets close the connection and cause the client to re-establish - dispatcher.completeConnection(key); - } - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java deleted file mode 100644 index 73fd97f59f..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.List; -import java.util.concurrent.BlockingQueue; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler; -import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; -import org.apache.nifi.processors.lumberjack.frame.LumberjackDecoder; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException; - -/** - * Extends the StandardSocketChannelHandler to decode bytes into Lumberjack frames. - */ -@Deprecated -public class LumberjackSocketChannelHandler> extends StandardSocketChannelHandler { - - private LumberjackDecoder decoder; - private LumberjackFrameHandler frameHandler; - - public LumberjackSocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - this.decoder = new LumberjackDecoder(charset); - this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); - } - - @Override - protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) - throws InterruptedException, IOException { - - // get total bytes in buffer - final int total = socketBuffer.remaining(); - final InetAddress sender = socketChannel.socket().getInetAddress(); - - try { - // go through the buffer parsing the Lumberjack command - for (int i = 0; i < total; i++) { - byte currByte = socketBuffer.get(); - - // if we found the end of a frame, handle the frame and mark the buffer - if (decoder.process(currByte)) { - final List frames = decoder.getFrames(); - - for (LumberjackFrame frame : frames) { - //TODO: Clean this - logger.debug("Received Lumberjack frame with transaction {} and command {}", - new Object[]{frame.getSeqNumber(), frame.getSeqNumber()}); - // Ignore the WINDOWS type frames as they contain no payload. - if (frame.getFrameType() != 0x57) { - final SocketChannelResponder responder = new SocketChannelResponder(socketChannel); - frameHandler.handle(frame, responder, sender.toString()); - } - } - socketBuffer.mark(); - } - } - logger.debug("Done processing buffer"); - - } catch (final LumberjackFrameException rfe) { - logger.error("Error reading Lumberjack frames due to {}", new Object[] {rfe.getMessage()}, rfe); - // if an invalid frame or bad data was sent then the decoder will be left in a - // corrupted state, so lets close the connection and cause the client to re-establish - dispatcher.completeConnection(key); - } - } - - // not used for anything in Lumberjack since the decoder encapsulates the delimiter - @Override - public byte getDelimiter() { - return LumberjackFrame.DELIMITER; - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java deleted file mode 100644 index fe287f8668..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandler; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; - -/** - * Default factory for creating Lumberjack socket channel handlers. - */ -@Deprecated -public class LumberjackSocketChannelHandlerFactory> implements ChannelHandlerFactory { - - @Override - public ChannelHandler createHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - return new LumberjackSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public ChannelHandler createSSLHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory eventFactory, - final BlockingQueue events, - final ComponentLog logger) { - return new LumberjackSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackChannelResponse.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackChannelResponse.java deleted file mode 100644 index 8749759933..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackChannelResponse.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.response; - -import org.apache.nifi.processor.util.listen.response.ChannelResponse; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; -import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder; - -/** - * Creates a LumberjackFrame for the provided response and returns the encoded frame. - */ -public class LumberjackChannelResponse implements ChannelResponse { - - private final LumberjackEncoder encoder; - private final LumberjackResponse response; - - public LumberjackChannelResponse(final LumberjackEncoder encoder, final LumberjackResponse response) { - this.encoder = encoder; - this.response = response; - } - - @Override - public byte[] toByteArray() { - final LumberjackFrame frame = response.toFrame(); - return encoder.encode(frame); - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java deleted file mode 100644 index 850bba4b67..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.response; - -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; - -import java.nio.ByteBuffer; - -/** - 'ack' frame type - - SENT FROM READER ONLY - frame type value: ASCII 'A' aka byte value 0x41 - - Payload: - 32bit unsigned sequence number. - - */ -public class LumberjackResponse { - private final long seqNumber; - final private byte version = 0x31, frameType = 0x41; - - - - public LumberjackResponse(final long seqNumber) { - this.seqNumber = seqNumber; - } - - /** - * Creates a LumberjackFrame where the data portion will contain this response. - * - * - * @return a LumberjackFrame for for this response - */ - public LumberjackFrame toFrame() { - - return new LumberjackFrame.Builder() - .version(version) - .frameType(frameType) - .payload(ByteBuffer.allocate(4).putInt((int) seqNumber).array()) - .build(); - } - - public static LumberjackResponse ok(final long seqNumber) { - return new LumberjackResponse(seqNumber); - } - -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index 3c23391f88..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.processors.lumberjack.ListenLumberjack diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java deleted file mode 100644 index e7a71fd3e1..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.lumberjack.event; - -import java.util.HashMap; -import java.util.Map; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; -import org.junit.Assert; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class TestLumberjackEventFactory { - - @Test - public void testCreateLumberJackEvent() { - final String sender = "testsender1"; - final byte[] data = "this is a test line".getBytes(); - final long seqNumber = 1; - final String fields = "{\"file\":\"test\"}"; - - - final Map metadata = new HashMap<>(); - metadata.put(EventFactory.SENDER_KEY, sender); - metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber)); - metadata.put(LumberjackMetadata.FIELDS_KEY, String.valueOf(fields)); - - final ChannelResponder responder = new SocketChannelResponder(null); - - final EventFactory factory = new LumberjackEventFactory(); - - final LumberjackEvent event = factory.create(data, metadata, responder); - - Assert.assertEquals(sender, event.getSender()); - Assert.assertEquals(seqNumber, event.getSeqNumber()); - Assert.assertEquals(fields, event.getFields()); - Assert.assertEquals(data, event.getData()); - } -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java deleted file mode 100644 index 4a862b46cb..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.List; -import javax.xml.bind.DatatypeConverter; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class TestLumberjackDecoder { - - // Because no encoder for type 43 was coded, added Static hex - // representation of compressed data - // - private static final String singleFrameData = "3143000000aa785e4c8e4daac3300c8413c8cbfeddc017681da7b48540775df51245103936f54fb" + - "04c4a6e5f6917d03020e91bc93c9ba669597faccefa80ec0fed72440dd1174833e819370c798d98aa0e79a10ae44e36972f94198b26886bc" + - "0774422589024c865aaecff07f24c6e1b0c37fb6c2da18cdb4176834f72747c4152e6aa46330db7e9725707567db0240c93aace93e212464" + - "95857f755e89e76e2d77e000000ffff010000ffff05b43bb8"; - private static final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728031957a97f82" + - "232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f021f71461b26873e711bee9480f48b0af10fe28" + - "89113b8c9e28f4322b82395413a50cafd79957c253d0b992faf4129c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f468079" + - "6b421964fc9b032ac4dcb54d2575a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff" + - "010000ffff35e0eff0"; - private static final String payload = ""; - - private LumberjackDecoder decoder; - - @Before - public void setup() { - this.decoder = new LumberjackDecoder(StandardCharsets.UTF_8); - } - - @Test - public void testDecodeSingleFrame() { - final byte[] input = DatatypeConverter.parseHexBinary(singleFrameData); - - List frames = null; - LumberjackFrame frame = null; - - for (byte b : input) { - if (decoder.process(b)) { - frames = decoder.getFrames(); - break; - } - } - - frame = frames.get(frames.size() - 1); - - Assert.assertNotNull(frame); - Assert.assertEquals(0x31, frame.getVersion()); - Assert.assertEquals(0x44, frame.getFrameType()); - Assert.assertEquals(1, frame.getSeqNumber()); - // Look for a predefined number of bytes for matching of the inner payload - Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15)); - } - - @Test - public void testDecodeMultipleFrame() { - final byte[] input = DatatypeConverter.parseHexBinary(multiFrameData); - - List frames = null; - LumberjackFrame frame = null; - - for (byte b : input) { - if (decoder.process(b)) { - frames = decoder.getFrames(); - break; - } - } - - frame = frames.get(1); - - Assert.assertNotNull(frame); - Assert.assertEquals(0x31, frame.getVersion()); - Assert.assertEquals(0x44, frame.getFrameType()); - // Load the second frame therefore seqNumber = 2 - Assert.assertEquals(2, frame.getSeqNumber()); - // Look for a predefined number of bytes for matching of the inner payload - Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15)); - } -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java deleted file mode 100644 index d45b51004d..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -import java.nio.ByteBuffer; -import javax.xml.bind.DatatypeConverter; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class TestLumberjackEncoder { - private LumberjackEncoder encoder; - - - @Before - public void setup() { - this.encoder = new LumberjackEncoder(); - } - - @Test - public void testEncode() { - LumberjackFrame frame = new LumberjackFrame.Builder() - .version((byte) 0x31) - .frameType((byte) 0x41) - .payload(ByteBuffer.allocate(8).putLong(123).array()) - .build(); - - byte[] encoded = encoder.encode(frame); - - Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("3141000000000000007B"), encoded); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java deleted file mode 100644 index ee910a3f76..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.frame; - -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class TestLumberjackFrame { - - @Test(expected = LumberjackFrameException.class) - public void testInvalidVersion() { - new LumberjackFrame.Builder().seqNumber(1234).dataSize(3).build(); - } - - @Test(expected = LumberjackFrameException.class) - public void testInvalidFrameType() { - new LumberjackFrame.Builder().frameType((byte) 0x70).dataSize(5).build(); - } - - @Test(expected = LumberjackFrameException.class) - public void testBlankFrameType() { - new LumberjackFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java deleted file mode 100644 index 45e74b6fa8..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; -import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import javax.net.ssl.SSLContext; -import javax.xml.bind.DatatypeConverter; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -@SuppressWarnings("deprecation") -public class ITLumberjackSocketChannelHandler { - private EventFactory eventFactory; - private ChannelHandlerFactory channelHandlerFactory; - private ByteBufferSource byteBufferSource; - private BlockingQueue events; - private ComponentLog logger = Mockito.mock(ComponentLog.class); - private int maxConnections; - private SSLContext sslContext; - private Charset charset; - private ChannelDispatcher dispatcher; - - @Before - public void setup() { - eventFactory = new TestEventHolderFactory(); - channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>(); - - byteBufferSource = new ByteBufferPool(1, 4096); - - events = new LinkedBlockingQueue<>(); - logger = Mockito.mock(ComponentLog.class); - - maxConnections = 1; - sslContext = null; - charset = StandardCharsets.UTF_8; - - dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger, - maxConnections, sslContext, charset); - - } - - @Test - public void testBasicHandling() throws IOException, InterruptedException { - final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" + - "031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" + - "1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" + - "29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" + - "a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" + - "0ffff35e0eff0"; - final List messages = new ArrayList<>(); - messages.add(multiFrameData); - - run(messages); - - // Check for the 4 frames (from the hex string above) are back... - Assert.assertEquals(4, events.size()); - - boolean found1 = false; - boolean found2 = false; - boolean found3 = false; - boolean found4 = false; - - TestEvent event; - while((event = events.poll()) != null) { - Map metadata = event.metadata; - Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY)); - - final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY); - if (seqNum.equals("1")) { - found1 = true; - } else if (seqNum.equals("2")) { - found2 = true; - } else if (seqNum.equals("3")) { - found3 = true; - } else if (seqNum.equals("4")) { - found4 = true; - } - } - - Assert.assertTrue(found1); - Assert.assertTrue(found2); - Assert.assertTrue(found3); - Assert.assertTrue(found4); - } - - protected void run(List messages) throws IOException, InterruptedException { - final ByteBuffer buffer = ByteBuffer.allocate(1024); - try { - // starts the dispatcher listening on port 0 so it selects a random port - dispatcher.open(null, 0, 4096); - - // starts a thread to run the dispatcher which will accept/read connections - Thread dispatcherThread = new Thread(dispatcher); - dispatcherThread.start(); - - - // create a client connection to the port the dispatcher is listening on - final int realPort = dispatcher.getPort(); - try (SocketChannel channel = SocketChannel.open()) { - channel.connect(new InetSocketAddress("localhost", realPort)); - Thread.sleep(100); - - // send the provided messages - for (int i=0; i < messages.size(); i++) { - buffer.clear(); - buffer.put(DatatypeConverter.parseHexBinary(messages.get(i))); - buffer.flip(); - - while (buffer.hasRemaining()) { - channel.write(buffer); - } - Thread.sleep(1); - } - } - - // wait up to 10 seconds to verify the responses - long timeout = 10000; - long startTime = System.currentTimeMillis(); - while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) { - Thread.sleep(100); - } - - // should have gotten an event for each message sent - Assert.assertEquals(4, events.size()); - - } finally { - // stop the dispatcher thread and ensure we shut down handler threads - dispatcher.close(); - } - } - - // Test event to produce from the data - private static class TestEvent implements Event { - - private byte[] data; - private Map metadata; - - public TestEvent(byte[] data, Map metadata) { - this.data = data; - this.metadata = metadata; - } - - @Override - public String getSender() { - return metadata.get(EventFactory.SENDER_KEY); - } - - @Override - public byte[] getData() { - return data; - } - - @Override - public ChannelResponder getResponder() { - return null; - } - } - - // Factory to create test events and send responses for testing - private static class TestEventHolderFactory implements EventFactory { - - @Override - public TestEvent create(final byte[] data, final Map metadata, final ChannelResponder responder) { - return new TestEvent(data, metadata); - } - } -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java deleted file mode 100644 index 64fd78efd1..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processor.util.listen.response.ChannelResponse; -import org.apache.nifi.processors.lumberjack.event.LumberjackEvent; -import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory; -import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder; -import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -@SuppressWarnings("deprecation") -public class TestLumberjackFrameHandler { - private Charset charset; - private EventFactory eventFactory; - private BlockingQueue events; - private SelectionKey key; - private AsyncChannelDispatcher dispatcher; - private LumberjackEncoder encoder; - - private ComponentLog logger; - - private LumberjackFrameHandler frameHandler; - - @Before - public void setup() { - this.charset = StandardCharsets.UTF_8; - this.eventFactory = new LumberjackEventFactory(); - this.events = new LinkedBlockingQueue<>(); - this.key = Mockito.mock(SelectionKey.class); - this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class); - this.logger = Mockito.mock(ComponentLog.class); - - this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); - } - - @Test - public void testWindow() throws IOException, InterruptedException { - final LumberjackFrame openFrame = new LumberjackFrame.Builder() - .version((byte) 0x31) - .frameType((byte) 0x57) - .seqNumber(-1) - .payload(Integer.toString(1).getBytes()) - .build(); - - - final String sender = "sender1"; - final CapturingChannelResponder responder = new CapturingChannelResponder(); - - // call the handler and verify respond() was called once with once response - frameHandler.handle(openFrame, responder, sender); - - // No response expected - Assert.assertEquals(0, responder.responded); - } - - - @Test - public void testData() throws IOException, InterruptedException { - final byte payload[] = new byte[]{ - 0x00, 0x00, 0x00, 0x02, // Number of pairs - 0x00, 0x00, 0x00, 0x04, // Length of first pair key ('line') - 0x6C, 0x69, 0x6E, 0x65, // 'line' - 0x00, 0x00, 0x00, 0x0C, // Length of 'test-content' - 0x74, 0x65, 0x73, 0x74, // - 0x2d, 0x63, 0x6f, 0x6e, // 'test-content' - 0x74, 0x65, 0x6e, 0x74, // - 0x00, 0x00, 0x00, 0x05, // Length of 2nd pair key (field) - 0x66, 0x69, 0x65, 0x6c, // - 0x64, // 'field' - 0x00, 0x00, 0x00, 0x05, // Length of 'value' - 0x76, 0x61, 0x6c, 0x75, // 'value' - 0x65 - }; - - final LumberjackFrame dataFrame = new LumberjackFrame.Builder() - .version((byte) 0x31) - .frameType((byte) 0x44) - .seqNumber(1) - // Payload eq { enil: hello } - .payload(payload) - .build(); - - - final String sender = "sender1"; - final CapturingChannelResponder responder = new CapturingChannelResponder(); - - // call the handler and verify respond() was called once with once response - frameHandler.handle(dataFrame, responder, sender); - - // No response expected - Assert.assertEquals(0, responder.responded); - // But events should contain one event - Assert.assertEquals(1, events.size()); - - final LumberjackEvent event = events.poll(); - Assert.assertEquals("{\"field\":\"value\"}", new String((event.getFields()))); - Assert.assertEquals("test-content", new String(event.getData(), charset)); - } - - private static class CapturingChannelResponder implements ChannelResponder { - - int responded; - List responses = new ArrayList<>(); - - @Override - public SocketChannel getChannel() { - return Mockito.mock(SocketChannel.class); - } - - @Override - public List getResponses() { - return responses; - } - - @Override - public void addResponse(ChannelResponse response) { - responses.add(response); - } - - @Override - public void respond() throws IOException { - responded++; - } - } -} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml b/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml deleted file mode 100644 index 1474a95c74..0000000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-nar-bundles - 1.16.0-SNAPSHOT - - - nifi-lumberjack-bundle - 1.16.0-SNAPSHOT - pom - - - nifi-lumberjack-processors - nifi-lumberjack-nar - - - diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 8503db7766..1df96814ef 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -60,7 +60,6 @@ nifi-amqp-bundle nifi-splunk-bundle nifi-jms-bundle - nifi-lumberjack-bundle nifi-beats-bundle nifi-cassandra-bundle nifi-spring-bundle