From 3626abd78a807a4ff806d51e7d27194f9fb7c961 Mon Sep 17 00:00:00 2001 From: Andrew Psaltis Date: Thu, 19 Jan 2017 18:28:21 -0500 Subject: [PATCH] NIFI-2615 Adding GetTCP processor NIFI-2615 Addressing changes from P/R. Specifically, removing .gitignore as it should not be there for a nar. Removed non-used class. Changed name in notice --- .../nifi-tcp-bundle/nifi-tcp-nar/pom.xml | 41 +++ .../src/main/resources/META-INF/LICENSE | 212 ++++++++++++ .../src/main/resources/META-INF/NOTICE | 24 ++ .../nifi-tcp-processors/pom.xml | 51 +++ .../gettcp/AbstractSocketHandler.java | 257 +++++++++++++++ .../apache/nifi/processors/gettcp/GetTCP.java | 302 ++++++++++++++++++ .../nifi/processors/gettcp/GetTCPUtils.java | 85 +++++ .../processors/gettcp/MessageHandler.java | 24 ++ .../processors/gettcp/ReceivingClient.java | 166 ++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../gettcp/ReceivingClientTest.java | 199 ++++++++++++ .../apache/nifi/processors/gettcp/Server.java | 91 ++++++ .../nifi/processors/gettcp/TestGetTCP.java | 118 +++++++ .../src/test/resources/log4j.properties | 8 + nifi-nar-bundles/nifi-tcp-bundle/pom.xml | 35 ++ nifi-nar-bundles/pom.xml | 1 + 16 files changed, 1629 insertions(+) create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/AbstractSocketHandler.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCPUtils.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/MessageHandler.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/resources/log4j.properties create mode 100644 nifi-nar-bundles/nifi-tcp-bundle/pom.xml diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/pom.xml b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/pom.xml new file mode 100644 index 0000000000..5227431a45 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-tcp-bundle + 1.2.0-SNAPSHOT + + + nifi-tcp-nar + 1.2.0-SNAPSHOT + nar + + true + true + + + + + org.apache.nifi + nifi-tcp-processors + 1.2.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..a57b09a9c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,212 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..970a1e60e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,24 @@ +nifi-tcp-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2017 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/pom.xml b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/pom.xml new file mode 100644 index 0000000000..5e9cad41a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-tcp-bundle + 1.2.0-SNAPSHOT + + + nifi-tcp-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + 4.11 + test + + + diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/AbstractSocketHandler.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/AbstractSocketHandler.java new file mode 100644 index 0000000000..f27562938a --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/AbstractSocketHandler.java @@ -0,0 +1,257 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gettcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.NetworkChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class to implement async TCP Client/Server components + * + */ +abstract class AbstractSocketHandler { + + final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final ByteBuffer readingBuffer; + + private final Runnable listenerTask; + + private volatile ExecutorService listenerTaskExecutor; + + final InetSocketAddress address; + + volatile NetworkChannel rootChannel; + + volatile Selector selector; + + private final AtomicBoolean isRunning; + + protected final byte endOfMessageByte; + + /** + * + * @param address + * @param server + */ + public AbstractSocketHandler(InetSocketAddress address, int readingBufferSize, byte endOfMessageByte) { + this.address = address; + this.listenerTask = new ListenerTask(); + this.readingBuffer = ByteBuffer.allocate(readingBufferSize); + this.isRunning = new AtomicBoolean(); + this.endOfMessageByte = endOfMessageByte; + } + + /** + * + * @return + */ + public void start() { + if (this.isRunning.compareAndSet(false, true)) { + try { + if (this.selector == null || !this.selector.isOpen()) { + this.selector = Selector.open(); + InetSocketAddress connectedAddress = this.connect(); + this.listenerTaskExecutor = Executors.newCachedThreadPool(); + this.listenerTaskExecutor.execute(this.listenerTask); + if (logger.isDebugEnabled()) { + logger.debug("Started listener for " + AbstractSocketHandler.this.getClass().getSimpleName()); + } + if (logger.isInfoEnabled()) { + logger.info("Successfully bound to " + connectedAddress); + } + } + } catch (Exception e) { + this.stop(); + throw new IllegalStateException("Failed to start " + this.getClass().getName(), e); + } + } + } + + /** + * + * @param force + */ + public void stop() { + if (this.isRunning.compareAndSet(true, false)) { + try { + if (this.selector != null && this.selector.isOpen()) { // since stop must be idempotent, we need to check if selector is open to avoid ClosedSelectorException + Set selectionKeys = new HashSet<>(this.selector.keys()); + for (SelectionKey key : selectionKeys) { + key.cancel(); + try { + key.channel().close(); + } catch (IOException e) { + logger.warn("Failure while closing channel", e); + } + } + try { + this.selector.close(); + } catch (Exception e) { + logger.warn("Failure while closinig selector", e); + } + logger.info(this.getClass().getSimpleName() + " is stopped listening on " + address); + } + } finally { + if (this.listenerTaskExecutor != null) { + this.listenerTaskExecutor.shutdown(); + } + } + } + } + + /** + * Checks if this component is running. + */ + public boolean isRunning() { + return this.isRunning.get(); + } + + /** + * + * @throws Exception + */ + abstract InetSocketAddress connect() throws Exception; + + /** + * Will process the data received from the channel + * @param selectionKey key for the channel the data came from + * @param buffer buffer of received data + * @throws IOException + */ + abstract void processData(SelectionKey selectionKey, ByteBuffer buffer) throws IOException; + + /** + * + * @param selectionKey + * @throws IOException + */ + void doAccept(SelectionKey selectionKey) throws IOException { + // noop + } + + /** + * Main listener task which will process delegate {@link SelectionKey} + * selected from the {@link Selector} to the appropriate processing method + * (e.g., accept, read, write etc.) + */ + private class ListenerTask implements Runnable { + @Override + public void run() { + try { + while (AbstractSocketHandler.this.rootChannel != null && AbstractSocketHandler.this.rootChannel.isOpen() && AbstractSocketHandler.this.selector.isOpen()) { + if (AbstractSocketHandler.this.selector.isOpen() && AbstractSocketHandler.this.selector.select(10) > 0) { + Iterator keys = AbstractSocketHandler.this.selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey selectionKey = keys.next(); + keys.remove(); + if (selectionKey.isValid()) { + if (selectionKey.isAcceptable()) { + this.accept(selectionKey); + } else if (selectionKey.isReadable()) { + this.read(selectionKey); + } else if (selectionKey.isConnectable()) { + this.connect(selectionKey); + } + } + } + } + } + } catch (Exception e) { + logger.error("Exception in socket listener loop", e); + } + + logger.debug("Exited Listener loop."); + AbstractSocketHandler.this.stop(); + } + + /** + * + */ + private void accept(SelectionKey selectionKey) throws IOException { + AbstractSocketHandler.this.doAccept(selectionKey); + } + + /** + * + */ + private void connect(SelectionKey selectionKey) throws IOException { + SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); + if (clientChannel.isConnectionPending()) { + clientChannel.finishConnect(); + } + clientChannel.register(AbstractSocketHandler.this.selector, SelectionKey.OP_READ); + } + + /** + * The main read loop which reads packets from the channel and sends + * them to implementations of + * {@link AbstractSocketHandler#processData(SelectionKey, ByteBuffer)}. + * So if a given implementation is a Server it is probably going to + * broadcast received message to all connected sockets (e.g., chat + * server). If such implementation is the Client, then it is most likely + * the end of the road where message is processed. + */ + private void read(SelectionKey selectionKey) throws IOException { + SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); + + int count = -1; + boolean finished = false; + while (!finished && (count = socketChannel.read(AbstractSocketHandler.this.readingBuffer)) > 0){ + byte lastByte = AbstractSocketHandler.this.readingBuffer.get(AbstractSocketHandler.this.readingBuffer.position() - 1); + if (AbstractSocketHandler.this.readingBuffer.remaining() == 0 || lastByte == AbstractSocketHandler.this.endOfMessageByte) { + this.processBuffer(selectionKey); + if (lastByte == AbstractSocketHandler.this.endOfMessageByte) { + finished = true; + } + } + } + + if (count == -1) { + if (AbstractSocketHandler.this.readingBuffer.position() > 0) {// flush remainder, since nothing else is coming + this.processBuffer(selectionKey); + } + selectionKey.cancel(); + socketChannel.close(); + if (logger.isInfoEnabled()) { + logger.info("Connection closed by: " + socketChannel.socket()); + } + } + } + + private void processBuffer(SelectionKey selectionKey) throws IOException { + AbstractSocketHandler.this.readingBuffer.flip(); + byte[] message = new byte[AbstractSocketHandler.this.readingBuffer.limit()]; + AbstractSocketHandler.this.readingBuffer.get(message); + AbstractSocketHandler.this.processData(selectionKey, ByteBuffer.wrap(message)); + AbstractSocketHandler.this.readingBuffer.clear(); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java new file mode 100644 index 0000000000..c366929aaa --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gettcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +@TriggerSerially +@SideEffectFree +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Connects over TCP to the provided endpoint(s). Received data will be written as content to the FlowFile") +@WritesAttribute(attribute = "source.endpoint", description = "The address of the source endpoint the message came from") +public class GetTCP extends AbstractSessionFactoryProcessor { + + private static String SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint"; + + public static final PropertyDescriptor ENDPOINT_LIST = new PropertyDescriptor.Builder() + .name("endpoint-list") + .displayName("Endpoint List") + .description("A comma delimited list of the endpoints to connect to. The format should be " + + ":. Only one server will be connected to at a time, the others " + + "will be used as fail overs.") + .required(true) + .addValidator(GetTCPUtils.ENDPOINT_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new PropertyDescriptor.Builder() + .name("connection-attempt-timeout") + .displayName("Connection Attempt Count") + .description("The number of times to try and establish a connection, before using a backup host if available." + + " This same attempt count would be used for a backup host as well.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("3") + .build(); + + public static final PropertyDescriptor RECONNECT_INTERVAL = new PropertyDescriptor.Builder() + .name("reconnect-interval") + .displayName("Reconnect interval") + .description("The number of seconds to wait before attempting to reconnect to the endpoint.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 sec") + .build(); + + public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("receive-buffer-size") + .displayName("Receive Buffer Size") + .description("The size of the buffer to receive data in") + .required(false) + .defaultValue("2048") + .addValidator(StandardValidators.createLongValidator(1, 2048, true)) + .build(); + + public static final PropertyDescriptor END_OF_MESSAGE_BYTE = new PropertyDescriptor.Builder() + .name("end-of-message-byte") + .displayName("End of message delimiter byte") + .description("Byte value which denotes end of message. Must be specified as integer within " + + "the valid byte range (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.") + .required(true) + .defaultValue("13") + .addValidator(StandardValidators.createLongValidator(-128, 127, true)) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("Success") + .description("The relationship that all sucessful messages from the endpoint will be sent to.") + .build(); + + public static final Relationship REL_PARTIAL = new Relationship.Builder() + .name("Partial") + .description("The relationship that all incomplete messages from the endpoint will be sent to. " + + "Incomplete message is the message that doesn't end with 'End of message delimiter byte'. " + + "This can happen when 'Receive Buffer Size' is smaller then the incoming message. If that happens that " + + "the subsequent message that completes the previous incomplete message will also end up in this " + + "relationship, after which subsequent 'complete' messages will go to 'success'.") + .build(); + + private final static List DESCRIPTORS; + + private final static Set RELATIONSHIPS; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(ENDPOINT_LIST); + _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT); + _propertyDescriptors.add(RECONNECT_INTERVAL); + _propertyDescriptors.add(RECEIVE_BUFFER_SIZE); + _propertyDescriptors.add(END_OF_MESSAGE_BYTE); + + DESCRIPTORS = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_PARTIAL); + RELATIONSHIPS = Collections.unmodifiableSet(_relationships); + } + + private final Map dynamicAttributes = new HashMap<>(); + + private final Map liveTcpClients = new HashMap<>(); + + private volatile NiFiDelegatingMessageHandler delegatingMessageHandler; + + private volatile ScheduledThreadPoolExecutor clientScheduler; + + private volatile String originalServerAddressList; + + private volatile int receiveBufferSize; + + private volatile int connectionAttemptCount; + + private volatile long reconnectInterval; + + private volatile byte endOfMessageByte; + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws ProcessException { + this.receiveBufferSize = context.getProperty(RECEIVE_BUFFER_SIZE).asInteger(); + this.originalServerAddressList = context.getProperty(ENDPOINT_LIST).getValue(); + this.endOfMessageByte = ((byte) context.getProperty(END_OF_MESSAGE_BYTE).asInteger().intValue()); + this.connectionAttemptCount = context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger(); + this.reconnectInterval = context.getProperty(RECONNECT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + + this.clientScheduler = new ScheduledThreadPoolExecutor(originalServerAddressList.split(",").length + 1); + this.clientScheduler.setKeepAliveTime(10, TimeUnit.SECONDS); + this.clientScheduler.allowCoreThreadTimeOut(true); + + for (final Map.Entry entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + this.dynamicAttributes.put(descriptor.getName(), entry.getValue()); + } + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + if (this.delegatingMessageHandler == null) { + this.delegatingMessageHandler = new NiFiDelegatingMessageHandler(sessionFactory); + } + this.run(context); + context.yield(); + } + + @OnStopped + public void tearDown() throws ProcessException { + for (ReceivingClient client : this.liveTcpClients.values()) { + try { + client.stop(); + } catch (Exception e) { + this.getLogger().warn("Failure while stopping client '" + client + "'", e); + } + } + this.liveTcpClients.clear(); + this.clientScheduler.shutdown(); + try { + if (!this.clientScheduler.awaitTermination(10000, TimeUnit.MILLISECONDS)) { + this.getLogger().info("Failed to stop client scheduler in 10 sec. Terminating"); + this.clientScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + this.getLogger().info("Processor has successfully shut down"); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true) + .build(); + } + + private void run(ProcessContext context) { + String[] serverAddresses = this.originalServerAddressList.split(","); + for (String hostPortPair : serverAddresses) { + ReceivingClient client; + if (!this.liveTcpClients.containsKey(hostPortPair)) { + String[] hostAndPort = hostPortPair.split(":"); + InetSocketAddress address = new InetSocketAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + client = new ReceivingClient(address, this.clientScheduler, this.receiveBufferSize, this.endOfMessageByte); + client.setReconnectAttempts(this.connectionAttemptCount); + client.setDelayMillisBeforeReconnect(this.reconnectInterval); + client.setMessageHandler(this.delegatingMessageHandler); + this.liveTcpClients.put(hostPortPair, client); + this.startClient(client); + } else { + client = this.liveTcpClients.get(hostPortPair); + if (!client.isRunning()) { + client.stop(); // primarily for cleanup in the event of abnormal termination + this.startClient(client); + } + } + } + } + + private void startClient(ReceivingClient client) { + this.clientScheduler.execute(new Runnable() { + @Override + public void run() { + try { + client.start(); + } catch (Exception e) { + getLogger().warn("Failed to start listening client. Will attempt to start on another trigger cycle.", e); + } + } + }); + } + + /** + * + */ + private class NiFiDelegatingMessageHandler implements MessageHandler { + private final ProcessSessionFactory sessionFactory; + + NiFiDelegatingMessageHandler(ProcessSessionFactory sessionFactory) { + this.sessionFactory = sessionFactory; + } + + @Override + public void handle(InetSocketAddress sourceAddress, byte[] message, boolean partialMessage) { + ProcessSession session = this.sessionFactory.createSession(); + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(message); + } + }); + flowFile = session.putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, sourceAddress.toString()); + if (!GetTCP.this.dynamicAttributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, GetTCP.this.dynamicAttributes); + } + if (partialMessage) { + session.transfer(flowFile, REL_PARTIAL); + } else { + session.transfer(flowFile, REL_SUCCESS); + } + session.commit(); + } + } +} diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCPUtils.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCPUtils.java new file mode 100644 index 0000000000..845dda87d8 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCPUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gettcp; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.util.StandardValidators; + +class GetTCPUtils { + + private static final Pattern validIpAddressRegex = Pattern.compile( + "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"); + + private static final Pattern validHostnameRegex = Pattern.compile( + "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$"); + + private static final Pattern looksLikeIpRegex = Pattern.compile("^(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$"); + + public static final Validator ENDPOINT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + if (null == value || value.isEmpty()) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false) + .explanation(subject + " cannot be empty").build(); + } + // The format should be :{,:} + // first split on , + final String[] hostPortPairs = value.split(","); + boolean validHostPortPairs = true; + String reason = ""; + String offendingSubject = subject; + + if (0 == hostPortPairs.length) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false) + .explanation(offendingSubject + " cannot be empty").build(); + } + + for (int i = 0; i < hostPortPairs.length && validHostPortPairs; i++) { + String[] parts = hostPortPairs[i].split(":"); + + if (parts.length != 2) { + validHostPortPairs = false; + reason = " of malformed URL '" + hostPortPairs[i] + "'"; + } else { + Matcher validHost = validHostnameRegex.matcher(parts[0]); + Matcher validIp = validIpAddressRegex.matcher(parts[0]); + Matcher looksLikeValidIp = looksLikeIpRegex.matcher(parts[0]); + if (!validHost.find()) { + validHostPortPairs = false; + reason = " it contains invalid characters '" + parts[0] + "'"; + } else if (looksLikeValidIp.find() && !validIp.find()) { + validHostPortPairs = false; + reason = " it appears to be represented as an IP address which is out of legal range '" + parts[0] + "'"; + } + ValidationResult result = StandardValidators.PORT_VALIDATOR.validate(parts[1], parts[1], context); + if (!result.isValid()) { + validHostPortPairs = false; + reason = result.getExplanation(); + } + } + } + + return new ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason) + .valid(validHostPortPairs).build(); + } + }; +} diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/MessageHandler.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/MessageHandler.java new file mode 100644 index 0000000000..c6a6fea5a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/MessageHandler.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gettcp; + +import java.net.InetSocketAddress; + +public interface MessageHandler { + + void handle(InetSocketAddress sourceAddress, byte[] message, boolean partialMessage); +} diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java new file mode 100644 index 0000000000..94c2d27e16 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java @@ -0,0 +1,166 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gettcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Implementation of receiving network client. + */ +public class ReceivingClient extends AbstractSocketHandler { + + private final ScheduledExecutorService connectionScheduler; + + private volatile InetSocketAddress backupAddress; + + private volatile int reconnectAttempts; + + private volatile long delayMillisBeforeReconnect; + + private volatile MessageHandler messageHandler; + + private volatile InetSocketAddress connectedAddress; + + public ReceivingClient(InetSocketAddress address, ScheduledExecutorService connectionScheduler, int readingBufferSize, byte endOfMessageByte) { + super(address, readingBufferSize, endOfMessageByte); + this.connectionScheduler = connectionScheduler; + } + + public void setBackupAddress(InetSocketAddress backupAddress) { + this.backupAddress = backupAddress; + } + + public void setReconnectAttempts(int reconnectAttempts) { + this.reconnectAttempts = reconnectAttempts; + } + + public void setDelayMillisBeforeReconnect(long delayMillisBeforeReconnect) { + this.delayMillisBeforeReconnect = delayMillisBeforeReconnect; + } + + public void setMessageHandler(MessageHandler messageHandler) { + this.messageHandler = messageHandler; + } + + /** + * + */ + @Override + InetSocketAddress connect() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger attempt = new AtomicInteger(); + AtomicReference connectionError = new AtomicReference(); + this.connectionScheduler.execute(new Runnable() { + @Override + public void run() { + try { + rootChannel = doConnect(address); + latch.countDown(); + connectedAddress = address; + } catch (Exception e) { + if (logger.isInfoEnabled()) { + logger.info("Failed to connect to primary endpoint '" + address + "'."); + } + if (attempt.incrementAndGet() <= reconnectAttempts) { + if (logger.isInfoEnabled()) { + logger.info("Will attempt to reconnect to '" + address + "'."); + } + connectionScheduler.schedule(this, delayMillisBeforeReconnect, TimeUnit.MILLISECONDS); + } else { + if (backupAddress == null) { + connectionError.set(e); + } else { + try { + if (logger.isInfoEnabled()) { + logger.info("Every attempt to connect to '" + address + "' has failed."); + logger.info("Attempting to conect to secondary endppoint '" + backupAddress + "'."); + } + rootChannel = doConnect(backupAddress); + connectedAddress = backupAddress; + } catch (Exception re) { + logger.error("Failed to connect to secondary endpoint."); + connectionError.set(re); + } + } + latch.countDown(); + } + } + } + }); + + try { + boolean finishedTask = latch.await(this.reconnectAttempts * delayMillisBeforeReconnect + 2000, TimeUnit.MILLISECONDS); + if (finishedTask){ + if (connectionError.get() != null) { + throw connectionError.get(); + } + } else { + logger.error("Exceeded wait time to connect. Possible deadlock, please report!. Interrupting."); // should never happen! + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Current thread is interrupted"); + } + return this.connectedAddress; + } + + private SocketChannel doConnect(InetSocketAddress addressToConnect) throws IOException { + SocketChannel channel = SocketChannel.open(); + if (channel.connect(addressToConnect)) { + channel.configureBlocking(false); + channel.register(this.selector, SelectionKey.OP_READ); + } else { + throw new IllegalStateException("Failed to connect to Server at: " + addressToConnect); + } + return channel; + } + + /** + * + */ + @Override + void processData(SelectionKey selectionKey, ByteBuffer messageBuffer) throws IOException { + byte[] message = new byte[messageBuffer.limit()]; + logger.debug("Received message(size=" + message.length + ")"); + messageBuffer.get(message); + byte lastByteValue = message[message.length - 1]; + boolean partialMessage = false; + if (lastByteValue != this.endOfMessageByte) { + partialMessage = true; + selectionKey.attach(1); + } else { + Integer wasLastPartial = (Integer) selectionKey.attachment(); + if (wasLastPartial != null) { + if (wasLastPartial.intValue() == 1) { + partialMessage = true; + selectionKey.attach(0); + } + } + } + if (this.messageHandler != null) { + this.messageHandler.handle(this.connectedAddress, message, partialMessage); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..a8368939bc --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.gettcp.GetTCP \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java new file mode 100644 index 0000000000..fa0dd753ab --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java @@ -0,0 +1,199 @@ +package org.apache.nifi.processors.gettcp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ReceivingClientTest { + + private final static byte EOM = '\r'; + + private ScheduledExecutorService scheduler; + + @Before + public void before() { + this.scheduler = Executors.newScheduledThreadPool(1); + } + + @After + public void after() { + this.scheduler.shutdownNow(); + } + + @Test + public void validateSuccessfullConnectionAndCommunication() throws Exception { + String msgToSend = "Hello from validateSuccessfullConnectionAndCommunication"; + InetSocketAddress address = new InetSocketAddress(9999); + Server server = new Server(address, 1024, EOM); + server.start(); + + ReceivingClient client = new ReceivingClient(address, this.scheduler, 1024, EOM); + StringBuilder stringBuilder = new StringBuilder(); + client.setMessageHandler((fromAddress, message, partialMessage) -> stringBuilder.append(new String(message, StandardCharsets.UTF_8))); + client.start(); + assertTrue(client.isRunning()); + + this.sendToSocket(address, msgToSend); + Thread.sleep(200); + assertEquals("", stringBuilder.toString()); + this.sendToSocket(address, "\r"); + Thread.sleep(200); + assertEquals(msgToSend + "\r", stringBuilder.toString()); + + client.stop(); + server.stop(); + assertFalse(client.isRunning()); + assertFalse(server.isRunning()); + } + + @Test + public void validateSuccessfullConnectionAndCommunicationWithClientBufferSmallerThenMessage() throws Exception { + String msgToSend = "Hello from validateSuccessfullConnectionAndCommunicationWithClientBufferSmallerThenMessage"; + InetSocketAddress address = new InetSocketAddress(9999); + Server server = new Server(address, 1024, EOM); + server.start(); + + ReceivingClient client = new ReceivingClient(address, this.scheduler, 64, EOM); + List messages = new ArrayList<>(); + client.setMessageHandler((fromAddress, message, partialMessage) -> messages.add(new String(message, StandardCharsets.UTF_8))); + client.start(); + assertTrue(client.isRunning()); + + this.sendToSocket(address, msgToSend); + this.sendToSocket(address, "\r"); + Thread.sleep(200); + assertEquals("Hello from validateSuccessfullConnectionAndCommunicationWithClie", messages.get(0)); + assertEquals("ntBufferSmallerThenMessage\r", messages.get(1)); + + client.stop(); + server.stop(); + assertFalse(client.isRunning()); + assertFalse(server.isRunning()); + } + + @Test + public void validateMessageSendBeforeAfterClientConnectDisconnectNoEndOfMessageByte() throws Exception { + String msgToSend = "Hello from validateMessageSendBeforeAfterClientConnectDisconnectNoEndOfMessageByte"; + InetSocketAddress address = new InetSocketAddress(9999); + Server server = new Server(address, 1024, EOM); + server.start(); + this.sendToSocket(address, "foo"); // validates no unexpected errors + + ReceivingClient client = new ReceivingClient(address, this.scheduler, 30, EOM); + List messages = new ArrayList<>(); + client.setMessageHandler((fromAddress, message, partialMessage) -> messages.add(new String(message, StandardCharsets.UTF_8))); + client.start(); + assertTrue(client.isRunning()); + + this.sendToSocket(address, msgToSend); + Thread.sleep(200); + assertEquals(2, messages.size()); + assertEquals("Hello from validateMessageSend", messages.get(0)); + assertEquals("BeforeAfterClientConnectDiscon", messages.get(1)); + messages.clear(); + + client.stop(); + this.sendToSocket(address, msgToSend); + Thread.sleep(200); + assertEquals(0, messages.size()); + + this.sendToSocket(address, msgToSend); + + server.stop(); + assertFalse(client.isRunning()); + assertFalse(server.isRunning()); + } + + @Test + public void validateReconnectDuringReceive() throws Exception { + String msgToSend = "Hello from validateReconnectDuringReceive\r"; + InetSocketAddress addressMain = new InetSocketAddress(9998); + Server server = new Server(addressMain, 1024, EOM); + server.start(); + + ExecutorService sendingExecutor = Executors.newSingleThreadExecutor(); + + ReceivingClient client = new ReceivingClient(addressMain, this.scheduler, 1024, EOM); + client.setBackupAddress(addressMain); + client.setReconnectAttempts(10); + client.setDelayMillisBeforeReconnect(1000); + client.setMessageHandler((fromAddress, message, partialMessage) -> System.out.println(new String(message))); + client.start(); + assertTrue(client.isRunning()); + + sendingExecutor.execute(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 10; i++) { + try { + sendToSocket(addressMain, msgToSend); + Thread.sleep(100); + } catch (Exception e) { + try { + Thread.sleep(1000); + } catch (Exception ex) { + // ignore + } + } + + } + } + }); + + Thread.sleep(500); + server.stop(); + + Thread.sleep(500); + + server.start(); + Thread.sleep(1000); + + client.stop(); + server.stop(); + + assertFalse(client.isRunning()); + assertFalse(server.isRunning()); + } + + @Test + public void validateConnectionFailureAfterRetries() throws Exception { + ReceivingClient client = null; + try { + InetSocketAddress addressMain = new InetSocketAddress(9998); + InetSocketAddress addressSecondary = new InetSocketAddress(9999); + + client = new ReceivingClient(addressMain, this.scheduler, 1024, EOM); + client.setBackupAddress(addressSecondary); + client.setReconnectAttempts(5); + client.setDelayMillisBeforeReconnect(200); + client.start(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + assertFalse(client.isRunning()); + } + + private void sendToSocket(InetSocketAddress address, String message) throws Exception { + Socket socket = new Socket(address.getAddress(), address.getPort()); + PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + out.write(message); + out.flush(); + socket.close(); + } +} diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java new file mode 100644 index 0000000000..550d2cbcf6 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java @@ -0,0 +1,91 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gettcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * + */ +public class Server extends AbstractSocketHandler { + + public static void main(String[] args) throws Exception { + InetSocketAddress address = new InetSocketAddress(9999); + Server server = new Server(address, 4096, (byte) '\r'); + server.start(); + System.in.read(); + } + + /** + * + * @param address + * @param readingBufferSize + */ + public Server(InetSocketAddress address, int readingBufferSize, byte endOfMessageByte) { + super(address, readingBufferSize, endOfMessageByte); + } + + /** + * + */ + @Override + InetSocketAddress connect() throws IOException { + this.rootChannel = ServerSocketChannel.open(); + ServerSocketChannel channel = (ServerSocketChannel) rootChannel; + channel.configureBlocking(false); + channel.socket().bind(this.address); + channel.register(this.selector, SelectionKey.OP_ACCEPT); + return this.address; + } + + /** + * + */ + @Override + void doAccept(SelectionKey selectionKey) throws IOException { + ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel(); + SocketChannel channel = serverChannel.accept(); + if (logger.isInfoEnabled()) { + logger.info("Accepted connection from: " + channel.socket()); + } + channel.configureBlocking(false); + channel.register(this.selector, SelectionKey.OP_READ); + } + + /** + * Unlike the client side the read on the server will happen using receiving + * thread. + */ + @Override + void processData(SelectionKey selectionKey, ByteBuffer readBuffer) throws IOException { + logger.info("Server received message of " + readBuffer.limit() + " bytes in size and will delegate to all registered clients."); + for (SelectionKey key : selector.keys()) { + if (key.isValid() && key.channel() instanceof SocketChannel && !selectionKey.equals(key)) { + if (logger.isDebugEnabled()) { + logger.debug("Distributing incoming message to " + key.channel()); + } + SocketChannel sch = (SocketChannel) key.channel(); + sch.write(readBuffer); + readBuffer.rewind(); + } + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java new file mode 100644 index 0000000000..60868d0241 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gettcp; + +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public final class TestGetTCP { + private TestRunner testRunner; + private GetTCP processor; + + @Before + public void setup() { + processor = new GetTCP(); + testRunner = TestRunners.newTestRunner(processor); + } + + @Test + public void testSelectPropertiesValidation() { + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "!@;;*blah:9999"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999"); + testRunner.assertValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:-1"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, ","); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, ",localhost:9999"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "999,localhost:123"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:abc_port"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999;localhost:1234"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999,localhost:1234"); + testRunner.assertValid(); + testRunner.setProperty(GetTCP.END_OF_MESSAGE_BYTE, "354"); + testRunner.assertNotValid(); + testRunner.setProperty(GetTCP.END_OF_MESSAGE_BYTE, "13"); + testRunner.assertValid(); + } + + @Test + public void testDynamicProperty() { + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999,localhost:1234"); + testRunner.setProperty("MyCustomProperty", "abc"); + testRunner.assertValid(); + } + + @Test + public void testSuccessInteraction() throws Exception { + Server server = setupTCPServer(9999); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + 9999); + testRunner.run(1000, false); + this.sendToSocket(new InetSocketAddress(9999), "Hello\r"); + Thread.sleep(200); + testRunner.assertAllFlowFilesTransferred(GetTCP.REL_SUCCESS, 1); + testRunner.clearTransferState(); + testRunner.shutdown(); + server.stop(); + } + + @Test + public void testPartialInteraction() throws Exception { + Server server = setupTCPServer(9999); + testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + 9999); + testRunner.setProperty(GetTCP.RECEIVE_BUFFER_SIZE, "2"); + testRunner.run(1000, false); + this.sendToSocket(new InetSocketAddress(9999), "Hello\r"); + Thread.sleep(200); + testRunner.assertAllFlowFilesTransferred(GetTCP.REL_PARTIAL, 3); + testRunner.clearTransferState(); + + this.sendToSocket(new InetSocketAddress(9999), "H\r"); + Thread.sleep(200); + testRunner.assertAllFlowFilesTransferred(GetTCP.REL_SUCCESS, 1); + testRunner.clearTransferState(); + testRunner.shutdown(); + server.stop(); + } + + private Server setupTCPServer(int port) { + InetSocketAddress address = new InetSocketAddress(port); + Server server = new Server(address, 1024, (byte) '\r'); + server.start(); + return server; + } + + private void sendToSocket(InetSocketAddress address, String message) throws Exception { + Socket socket = new Socket(address.getAddress(), address.getPort()); + PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + out.write(message); + out.flush(); + socket.close(); + } +} diff --git a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000000..19965aba82 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/resources/log4j.properties @@ -0,0 +1,8 @@ +log4j.rootCategory=DEBUG, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n + +#log4j.category.com.hortonworks.minicluster=INFO +log4j.category.org.apache.tez.runtime=WARN diff --git a/nifi-nar-bundles/nifi-tcp-bundle/pom.xml b/nifi-nar-bundles/nifi-tcp-bundle/pom.xml new file mode 100644 index 0000000000..bf8821d931 --- /dev/null +++ b/nifi-nar-bundles/nifi-tcp-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.2.0-SNAPSHOT + + + + nifi-tcp-bundle + 1.2.0-SNAPSHOT + pom + + + nifi-tcp-processors + nifi-tcp-nar + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 8f4f7e28aa..df3aa7d1d4 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -72,6 +72,7 @@ nifi-email-bundle nifi-ranger-bundle nifi-websocket-bundle + nifi-tcp-bundle