NIFI-2615 Adding GetTCP processor

NIFI-2615 Addressing changes from P/R. Specifically, removing .gitignore as it should not be there for a nar. Removed non-used class. Changed name in notice
This commit is contained in:
Andrew Psaltis 2017-01-19 18:28:21 -05:00 committed by Oleg Zhurakousky
parent 1a6802a016
commit 3626abd78a
16 changed files with 1629 additions and 0 deletions

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-tcp-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-tcp-nar</artifactId>
<version>1.2.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-tcp-processors</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,212 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

View File

@ -0,0 +1,24 @@
nifi-tcp-nar
Copyright 2015-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2017 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())

View File

@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-tcp-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-tcp-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,257 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class to implement async TCP Client/Server components
*
*/
abstract class AbstractSocketHandler {
final Logger logger = LoggerFactory.getLogger(this.getClass());
private final ByteBuffer readingBuffer;
private final Runnable listenerTask;
private volatile ExecutorService listenerTaskExecutor;
final InetSocketAddress address;
volatile NetworkChannel rootChannel;
volatile Selector selector;
private final AtomicBoolean isRunning;
protected final byte endOfMessageByte;
/**
*
* @param address
* @param server
*/
public AbstractSocketHandler(InetSocketAddress address, int readingBufferSize, byte endOfMessageByte) {
this.address = address;
this.listenerTask = new ListenerTask();
this.readingBuffer = ByteBuffer.allocate(readingBufferSize);
this.isRunning = new AtomicBoolean();
this.endOfMessageByte = endOfMessageByte;
}
/**
*
* @return
*/
public void start() {
if (this.isRunning.compareAndSet(false, true)) {
try {
if (this.selector == null || !this.selector.isOpen()) {
this.selector = Selector.open();
InetSocketAddress connectedAddress = this.connect();
this.listenerTaskExecutor = Executors.newCachedThreadPool();
this.listenerTaskExecutor.execute(this.listenerTask);
if (logger.isDebugEnabled()) {
logger.debug("Started listener for " + AbstractSocketHandler.this.getClass().getSimpleName());
}
if (logger.isInfoEnabled()) {
logger.info("Successfully bound to " + connectedAddress);
}
}
} catch (Exception e) {
this.stop();
throw new IllegalStateException("Failed to start " + this.getClass().getName(), e);
}
}
}
/**
*
* @param force
*/
public void stop() {
if (this.isRunning.compareAndSet(true, false)) {
try {
if (this.selector != null && this.selector.isOpen()) { // since stop must be idempotent, we need to check if selector is open to avoid ClosedSelectorException
Set<SelectionKey> selectionKeys = new HashSet<>(this.selector.keys());
for (SelectionKey key : selectionKeys) {
key.cancel();
try {
key.channel().close();
} catch (IOException e) {
logger.warn("Failure while closing channel", e);
}
}
try {
this.selector.close();
} catch (Exception e) {
logger.warn("Failure while closinig selector", e);
}
logger.info(this.getClass().getSimpleName() + " is stopped listening on " + address);
}
} finally {
if (this.listenerTaskExecutor != null) {
this.listenerTaskExecutor.shutdown();
}
}
}
}
/**
* Checks if this component is running.
*/
public boolean isRunning() {
return this.isRunning.get();
}
/**
*
* @throws Exception
*/
abstract InetSocketAddress connect() throws Exception;
/**
* Will process the data received from the channel
* @param selectionKey key for the channel the data came from
* @param buffer buffer of received data
* @throws IOException
*/
abstract void processData(SelectionKey selectionKey, ByteBuffer buffer) throws IOException;
/**
*
* @param selectionKey
* @throws IOException
*/
void doAccept(SelectionKey selectionKey) throws IOException {
// noop
}
/**
* Main listener task which will process delegate {@link SelectionKey}
* selected from the {@link Selector} to the appropriate processing method
* (e.g., accept, read, write etc.)
*/
private class ListenerTask implements Runnable {
@Override
public void run() {
try {
while (AbstractSocketHandler.this.rootChannel != null && AbstractSocketHandler.this.rootChannel.isOpen() && AbstractSocketHandler.this.selector.isOpen()) {
if (AbstractSocketHandler.this.selector.isOpen() && AbstractSocketHandler.this.selector.select(10) > 0) {
Iterator<SelectionKey> keys = AbstractSocketHandler.this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey selectionKey = keys.next();
keys.remove();
if (selectionKey.isValid()) {
if (selectionKey.isAcceptable()) {
this.accept(selectionKey);
} else if (selectionKey.isReadable()) {
this.read(selectionKey);
} else if (selectionKey.isConnectable()) {
this.connect(selectionKey);
}
}
}
}
}
} catch (Exception e) {
logger.error("Exception in socket listener loop", e);
}
logger.debug("Exited Listener loop.");
AbstractSocketHandler.this.stop();
}
/**
*
*/
private void accept(SelectionKey selectionKey) throws IOException {
AbstractSocketHandler.this.doAccept(selectionKey);
}
/**
*
*/
private void connect(SelectionKey selectionKey) throws IOException {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
if (clientChannel.isConnectionPending()) {
clientChannel.finishConnect();
}
clientChannel.register(AbstractSocketHandler.this.selector, SelectionKey.OP_READ);
}
/**
* The main read loop which reads packets from the channel and sends
* them to implementations of
* {@link AbstractSocketHandler#processData(SelectionKey, ByteBuffer)}.
* So if a given implementation is a Server it is probably going to
* broadcast received message to all connected sockets (e.g., chat
* server). If such implementation is the Client, then it is most likely
* the end of the road where message is processed.
*/
private void read(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count = -1;
boolean finished = false;
while (!finished && (count = socketChannel.read(AbstractSocketHandler.this.readingBuffer)) > 0){
byte lastByte = AbstractSocketHandler.this.readingBuffer.get(AbstractSocketHandler.this.readingBuffer.position() - 1);
if (AbstractSocketHandler.this.readingBuffer.remaining() == 0 || lastByte == AbstractSocketHandler.this.endOfMessageByte) {
this.processBuffer(selectionKey);
if (lastByte == AbstractSocketHandler.this.endOfMessageByte) {
finished = true;
}
}
}
if (count == -1) {
if (AbstractSocketHandler.this.readingBuffer.position() > 0) {// flush remainder, since nothing else is coming
this.processBuffer(selectionKey);
}
selectionKey.cancel();
socketChannel.close();
if (logger.isInfoEnabled()) {
logger.info("Connection closed by: " + socketChannel.socket());
}
}
}
private void processBuffer(SelectionKey selectionKey) throws IOException {
AbstractSocketHandler.this.readingBuffer.flip();
byte[] message = new byte[AbstractSocketHandler.this.readingBuffer.limit()];
AbstractSocketHandler.this.readingBuffer.get(message);
AbstractSocketHandler.this.processData(selectionKey, ByteBuffer.wrap(message));
AbstractSocketHandler.this.readingBuffer.clear();
}
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@TriggerSerially
@SideEffectFree
@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Connects over TCP to the provided endpoint(s). Received data will be written as content to the FlowFile")
@WritesAttribute(attribute = "source.endpoint", description = "The address of the source endpoint the message came from")
public class GetTCP extends AbstractSessionFactoryProcessor {
private static String SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
public static final PropertyDescriptor ENDPOINT_LIST = new PropertyDescriptor.Builder()
.name("endpoint-list")
.displayName("Endpoint List")
.description("A comma delimited list of the endpoints to connect to. The format should be " +
"<server_address>:<port>. Only one server will be connected to at a time, the others " +
"will be used as fail overs.")
.required(true)
.addValidator(GetTCPUtils.ENDPOINT_VALIDATOR)
.build();
public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new PropertyDescriptor.Builder()
.name("connection-attempt-timeout")
.displayName("Connection Attempt Count")
.description("The number of times to try and establish a connection, before using a backup host if available." +
" This same attempt count would be used for a backup host as well.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("3")
.build();
public static final PropertyDescriptor RECONNECT_INTERVAL = new PropertyDescriptor.Builder()
.name("reconnect-interval")
.displayName("Reconnect interval")
.description("The number of seconds to wait before attempting to reconnect to the endpoint.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 sec")
.build();
public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("receive-buffer-size")
.displayName("Receive Buffer Size")
.description("The size of the buffer to receive data in")
.required(false)
.defaultValue("2048")
.addValidator(StandardValidators.createLongValidator(1, 2048, true))
.build();
public static final PropertyDescriptor END_OF_MESSAGE_BYTE = new PropertyDescriptor.Builder()
.name("end-of-message-byte")
.displayName("End of message delimiter byte")
.description("Byte value which denotes end of message. Must be specified as integer within "
+ "the valid byte range (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
.required(true)
.defaultValue("13")
.addValidator(StandardValidators.createLongValidator(-128, 127, true))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("Success")
.description("The relationship that all sucessful messages from the endpoint will be sent to.")
.build();
public static final Relationship REL_PARTIAL = new Relationship.Builder()
.name("Partial")
.description("The relationship that all incomplete messages from the endpoint will be sent to. "
+ "Incomplete message is the message that doesn't end with 'End of message delimiter byte'. "
+ "This can happen when 'Receive Buffer Size' is smaller then the incoming message. If that happens that "
+ "the subsequent message that completes the previous incomplete message will also end up in this "
+ "relationship, after which subsequent 'complete' messages will go to 'success'.")
.build();
private final static List<PropertyDescriptor> DESCRIPTORS;
private final static Set<Relationship> RELATIONSHIPS;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(ENDPOINT_LIST);
_propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
_propertyDescriptors.add(RECONNECT_INTERVAL);
_propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
_propertyDescriptors.add(END_OF_MESSAGE_BYTE);
DESCRIPTORS = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_PARTIAL);
RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
}
private final Map<String, String> dynamicAttributes = new HashMap<>();
private final Map<String, ReceivingClient> liveTcpClients = new HashMap<>();
private volatile NiFiDelegatingMessageHandler delegatingMessageHandler;
private volatile ScheduledThreadPoolExecutor clientScheduler;
private volatile String originalServerAddressList;
private volatile int receiveBufferSize;
private volatile int connectionAttemptCount;
private volatile long reconnectInterval;
private volatile byte endOfMessageByte;
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws ProcessException {
this.receiveBufferSize = context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
this.originalServerAddressList = context.getProperty(ENDPOINT_LIST).getValue();
this.endOfMessageByte = ((byte) context.getProperty(END_OF_MESSAGE_BYTE).asInteger().intValue());
this.connectionAttemptCount = context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
this.reconnectInterval = context.getProperty(RECONNECT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
this.clientScheduler = new ScheduledThreadPoolExecutor(originalServerAddressList.split(",").length + 1);
this.clientScheduler.setKeepAliveTime(10, TimeUnit.SECONDS);
this.clientScheduler.allowCoreThreadTimeOut(true);
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
this.dynamicAttributes.put(descriptor.getName(), entry.getValue());
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
if (this.delegatingMessageHandler == null) {
this.delegatingMessageHandler = new NiFiDelegatingMessageHandler(sessionFactory);
}
this.run(context);
context.yield();
}
@OnStopped
public void tearDown() throws ProcessException {
for (ReceivingClient client : this.liveTcpClients.values()) {
try {
client.stop();
} catch (Exception e) {
this.getLogger().warn("Failure while stopping client '" + client + "'", e);
}
}
this.liveTcpClients.clear();
this.clientScheduler.shutdown();
try {
if (!this.clientScheduler.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
this.getLogger().info("Failed to stop client scheduler in 10 sec. Terminating");
this.clientScheduler.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.getLogger().info("Processor has successfully shut down");
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true)
.build();
}
private void run(ProcessContext context) {
String[] serverAddresses = this.originalServerAddressList.split(",");
for (String hostPortPair : serverAddresses) {
ReceivingClient client;
if (!this.liveTcpClients.containsKey(hostPortPair)) {
String[] hostAndPort = hostPortPair.split(":");
InetSocketAddress address = new InetSocketAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
client = new ReceivingClient(address, this.clientScheduler, this.receiveBufferSize, this.endOfMessageByte);
client.setReconnectAttempts(this.connectionAttemptCount);
client.setDelayMillisBeforeReconnect(this.reconnectInterval);
client.setMessageHandler(this.delegatingMessageHandler);
this.liveTcpClients.put(hostPortPair, client);
this.startClient(client);
} else {
client = this.liveTcpClients.get(hostPortPair);
if (!client.isRunning()) {
client.stop(); // primarily for cleanup in the event of abnormal termination
this.startClient(client);
}
}
}
}
private void startClient(ReceivingClient client) {
this.clientScheduler.execute(new Runnable() {
@Override
public void run() {
try {
client.start();
} catch (Exception e) {
getLogger().warn("Failed to start listening client. Will attempt to start on another trigger cycle.", e);
}
}
});
}
/**
*
*/
private class NiFiDelegatingMessageHandler implements MessageHandler {
private final ProcessSessionFactory sessionFactory;
NiFiDelegatingMessageHandler(ProcessSessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Override
public void handle(InetSocketAddress sourceAddress, byte[] message, boolean partialMessage) {
ProcessSession session = this.sessionFactory.createSession();
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(message);
}
});
flowFile = session.putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, sourceAddress.toString());
if (!GetTCP.this.dynamicAttributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, GetTCP.this.dynamicAttributes);
}
if (partialMessage) {
session.transfer(flowFile, REL_PARTIAL);
} else {
session.transfer(flowFile, REL_SUCCESS);
}
session.commit();
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
class GetTCPUtils {
private static final Pattern validIpAddressRegex = Pattern.compile(
"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$");
private static final Pattern validHostnameRegex = Pattern.compile(
"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$");
private static final Pattern looksLikeIpRegex = Pattern.compile("^(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$");
public static final Validator ENDPOINT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
if (null == value || value.isEmpty()) {
return new ValidationResult.Builder().subject(subject).input(value).valid(false)
.explanation(subject + " cannot be empty").build();
}
// The format should be <host>:<port>{,<host>:<port>}
// first split on ,
final String[] hostPortPairs = value.split(",");
boolean validHostPortPairs = true;
String reason = "";
String offendingSubject = subject;
if (0 == hostPortPairs.length) {
return new ValidationResult.Builder().subject(subject).input(value).valid(false)
.explanation(offendingSubject + " cannot be empty").build();
}
for (int i = 0; i < hostPortPairs.length && validHostPortPairs; i++) {
String[] parts = hostPortPairs[i].split(":");
if (parts.length != 2) {
validHostPortPairs = false;
reason = " of malformed URL '" + hostPortPairs[i] + "'";
} else {
Matcher validHost = validHostnameRegex.matcher(parts[0]);
Matcher validIp = validIpAddressRegex.matcher(parts[0]);
Matcher looksLikeValidIp = looksLikeIpRegex.matcher(parts[0]);
if (!validHost.find()) {
validHostPortPairs = false;
reason = " it contains invalid characters '" + parts[0] + "'";
} else if (looksLikeValidIp.find() && !validIp.find()) {
validHostPortPairs = false;
reason = " it appears to be represented as an IP address which is out of legal range '" + parts[0] + "'";
}
ValidationResult result = StandardValidators.PORT_VALIDATOR.validate(parts[1], parts[1], context);
if (!result.isValid()) {
validHostPortPairs = false;
reason = result.getExplanation();
}
}
}
return new ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason)
.valid(validHostPortPairs).build();
}
};
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.net.InetSocketAddress;
public interface MessageHandler {
void handle(InetSocketAddress sourceAddress, byte[] message, boolean partialMessage);
}

View File

@ -0,0 +1,166 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* Implementation of receiving network client.
*/
public class ReceivingClient extends AbstractSocketHandler {
private final ScheduledExecutorService connectionScheduler;
private volatile InetSocketAddress backupAddress;
private volatile int reconnectAttempts;
private volatile long delayMillisBeforeReconnect;
private volatile MessageHandler messageHandler;
private volatile InetSocketAddress connectedAddress;
public ReceivingClient(InetSocketAddress address, ScheduledExecutorService connectionScheduler, int readingBufferSize, byte endOfMessageByte) {
super(address, readingBufferSize, endOfMessageByte);
this.connectionScheduler = connectionScheduler;
}
public void setBackupAddress(InetSocketAddress backupAddress) {
this.backupAddress = backupAddress;
}
public void setReconnectAttempts(int reconnectAttempts) {
this.reconnectAttempts = reconnectAttempts;
}
public void setDelayMillisBeforeReconnect(long delayMillisBeforeReconnect) {
this.delayMillisBeforeReconnect = delayMillisBeforeReconnect;
}
public void setMessageHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
/**
*
*/
@Override
InetSocketAddress connect() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger attempt = new AtomicInteger();
AtomicReference<Exception> connectionError = new AtomicReference<Exception>();
this.connectionScheduler.execute(new Runnable() {
@Override
public void run() {
try {
rootChannel = doConnect(address);
latch.countDown();
connectedAddress = address;
} catch (Exception e) {
if (logger.isInfoEnabled()) {
logger.info("Failed to connect to primary endpoint '" + address + "'.");
}
if (attempt.incrementAndGet() <= reconnectAttempts) {
if (logger.isInfoEnabled()) {
logger.info("Will attempt to reconnect to '" + address + "'.");
}
connectionScheduler.schedule(this, delayMillisBeforeReconnect, TimeUnit.MILLISECONDS);
} else {
if (backupAddress == null) {
connectionError.set(e);
} else {
try {
if (logger.isInfoEnabled()) {
logger.info("Every attempt to connect to '" + address + "' has failed.");
logger.info("Attempting to conect to secondary endppoint '" + backupAddress + "'.");
}
rootChannel = doConnect(backupAddress);
connectedAddress = backupAddress;
} catch (Exception re) {
logger.error("Failed to connect to secondary endpoint.");
connectionError.set(re);
}
}
latch.countDown();
}
}
}
});
try {
boolean finishedTask = latch.await(this.reconnectAttempts * delayMillisBeforeReconnect + 2000, TimeUnit.MILLISECONDS);
if (finishedTask){
if (connectionError.get() != null) {
throw connectionError.get();
}
} else {
logger.error("Exceeded wait time to connect. Possible deadlock, please report!. Interrupting."); // should never happen!
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Current thread is interrupted");
}
return this.connectedAddress;
}
private SocketChannel doConnect(InetSocketAddress addressToConnect) throws IOException {
SocketChannel channel = SocketChannel.open();
if (channel.connect(addressToConnect)) {
channel.configureBlocking(false);
channel.register(this.selector, SelectionKey.OP_READ);
} else {
throw new IllegalStateException("Failed to connect to Server at: " + addressToConnect);
}
return channel;
}
/**
*
*/
@Override
void processData(SelectionKey selectionKey, ByteBuffer messageBuffer) throws IOException {
byte[] message = new byte[messageBuffer.limit()];
logger.debug("Received message(size=" + message.length + ")");
messageBuffer.get(message);
byte lastByteValue = message[message.length - 1];
boolean partialMessage = false;
if (lastByteValue != this.endOfMessageByte) {
partialMessage = true;
selectionKey.attach(1);
} else {
Integer wasLastPartial = (Integer) selectionKey.attachment();
if (wasLastPartial != null) {
if (wasLastPartial.intValue() == 1) {
partialMessage = true;
selectionKey.attach(0);
}
}
}
if (this.messageHandler != null) {
this.messageHandler.handle(this.connectedAddress, message, partialMessage);
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.gettcp.GetTCP

View File

@ -0,0 +1,199 @@
package org.apache.nifi.processors.gettcp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ReceivingClientTest {
private final static byte EOM = '\r';
private ScheduledExecutorService scheduler;
@Before
public void before() {
this.scheduler = Executors.newScheduledThreadPool(1);
}
@After
public void after() {
this.scheduler.shutdownNow();
}
@Test
public void validateSuccessfullConnectionAndCommunication() throws Exception {
String msgToSend = "Hello from validateSuccessfullConnectionAndCommunication";
InetSocketAddress address = new InetSocketAddress(9999);
Server server = new Server(address, 1024, EOM);
server.start();
ReceivingClient client = new ReceivingClient(address, this.scheduler, 1024, EOM);
StringBuilder stringBuilder = new StringBuilder();
client.setMessageHandler((fromAddress, message, partialMessage) -> stringBuilder.append(new String(message, StandardCharsets.UTF_8)));
client.start();
assertTrue(client.isRunning());
this.sendToSocket(address, msgToSend);
Thread.sleep(200);
assertEquals("", stringBuilder.toString());
this.sendToSocket(address, "\r");
Thread.sleep(200);
assertEquals(msgToSend + "\r", stringBuilder.toString());
client.stop();
server.stop();
assertFalse(client.isRunning());
assertFalse(server.isRunning());
}
@Test
public void validateSuccessfullConnectionAndCommunicationWithClientBufferSmallerThenMessage() throws Exception {
String msgToSend = "Hello from validateSuccessfullConnectionAndCommunicationWithClientBufferSmallerThenMessage";
InetSocketAddress address = new InetSocketAddress(9999);
Server server = new Server(address, 1024, EOM);
server.start();
ReceivingClient client = new ReceivingClient(address, this.scheduler, 64, EOM);
List<String> messages = new ArrayList<>();
client.setMessageHandler((fromAddress, message, partialMessage) -> messages.add(new String(message, StandardCharsets.UTF_8)));
client.start();
assertTrue(client.isRunning());
this.sendToSocket(address, msgToSend);
this.sendToSocket(address, "\r");
Thread.sleep(200);
assertEquals("Hello from validateSuccessfullConnectionAndCommunicationWithClie", messages.get(0));
assertEquals("ntBufferSmallerThenMessage\r", messages.get(1));
client.stop();
server.stop();
assertFalse(client.isRunning());
assertFalse(server.isRunning());
}
@Test
public void validateMessageSendBeforeAfterClientConnectDisconnectNoEndOfMessageByte() throws Exception {
String msgToSend = "Hello from validateMessageSendBeforeAfterClientConnectDisconnectNoEndOfMessageByte";
InetSocketAddress address = new InetSocketAddress(9999);
Server server = new Server(address, 1024, EOM);
server.start();
this.sendToSocket(address, "foo"); // validates no unexpected errors
ReceivingClient client = new ReceivingClient(address, this.scheduler, 30, EOM);
List<String> messages = new ArrayList<>();
client.setMessageHandler((fromAddress, message, partialMessage) -> messages.add(new String(message, StandardCharsets.UTF_8)));
client.start();
assertTrue(client.isRunning());
this.sendToSocket(address, msgToSend);
Thread.sleep(200);
assertEquals(2, messages.size());
assertEquals("Hello from validateMessageSend", messages.get(0));
assertEquals("BeforeAfterClientConnectDiscon", messages.get(1));
messages.clear();
client.stop();
this.sendToSocket(address, msgToSend);
Thread.sleep(200);
assertEquals(0, messages.size());
this.sendToSocket(address, msgToSend);
server.stop();
assertFalse(client.isRunning());
assertFalse(server.isRunning());
}
@Test
public void validateReconnectDuringReceive() throws Exception {
String msgToSend = "Hello from validateReconnectDuringReceive\r";
InetSocketAddress addressMain = new InetSocketAddress(9998);
Server server = new Server(addressMain, 1024, EOM);
server.start();
ExecutorService sendingExecutor = Executors.newSingleThreadExecutor();
ReceivingClient client = new ReceivingClient(addressMain, this.scheduler, 1024, EOM);
client.setBackupAddress(addressMain);
client.setReconnectAttempts(10);
client.setDelayMillisBeforeReconnect(1000);
client.setMessageHandler((fromAddress, message, partialMessage) -> System.out.println(new String(message)));
client.start();
assertTrue(client.isRunning());
sendingExecutor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
sendToSocket(addressMain, msgToSend);
Thread.sleep(100);
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Exception ex) {
// ignore
}
}
}
}
});
Thread.sleep(500);
server.stop();
Thread.sleep(500);
server.start();
Thread.sleep(1000);
client.stop();
server.stop();
assertFalse(client.isRunning());
assertFalse(server.isRunning());
}
@Test
public void validateConnectionFailureAfterRetries() throws Exception {
ReceivingClient client = null;
try {
InetSocketAddress addressMain = new InetSocketAddress(9998);
InetSocketAddress addressSecondary = new InetSocketAddress(9999);
client = new ReceivingClient(addressMain, this.scheduler, 1024, EOM);
client.setBackupAddress(addressSecondary);
client.setReconnectAttempts(5);
client.setDelayMillisBeforeReconnect(200);
client.start();
fail();
} catch (Exception e) {
assertTrue(e instanceof IllegalStateException);
}
assertFalse(client.isRunning());
}
private void sendToSocket(InetSocketAddress address, String message) throws Exception {
Socket socket = new Socket(address.getAddress(), address.getPort());
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
out.write(message);
out.flush();
socket.close();
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
*
*/
public class Server extends AbstractSocketHandler {
public static void main(String[] args) throws Exception {
InetSocketAddress address = new InetSocketAddress(9999);
Server server = new Server(address, 4096, (byte) '\r');
server.start();
System.in.read();
}
/**
*
* @param address
* @param readingBufferSize
*/
public Server(InetSocketAddress address, int readingBufferSize, byte endOfMessageByte) {
super(address, readingBufferSize, endOfMessageByte);
}
/**
*
*/
@Override
InetSocketAddress connect() throws IOException {
this.rootChannel = ServerSocketChannel.open();
ServerSocketChannel channel = (ServerSocketChannel) rootChannel;
channel.configureBlocking(false);
channel.socket().bind(this.address);
channel.register(this.selector, SelectionKey.OP_ACCEPT);
return this.address;
}
/**
*
*/
@Override
void doAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverChannel.accept();
if (logger.isInfoEnabled()) {
logger.info("Accepted connection from: " + channel.socket());
}
channel.configureBlocking(false);
channel.register(this.selector, SelectionKey.OP_READ);
}
/**
* Unlike the client side the read on the server will happen using receiving
* thread.
*/
@Override
void processData(SelectionKey selectionKey, ByteBuffer readBuffer) throws IOException {
logger.info("Server received message of " + readBuffer.limit() + " bytes in size and will delegate to all registered clients.");
for (SelectionKey key : selector.keys()) {
if (key.isValid() && key.channel() instanceof SocketChannel && !selectionKey.equals(key)) {
if (logger.isDebugEnabled()) {
logger.debug("Distributing incoming message to " + key.channel());
}
SocketChannel sch = (SocketChannel) key.channel();
sch.write(readBuffer);
readBuffer.rewind();
}
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gettcp;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public final class TestGetTCP {
private TestRunner testRunner;
private GetTCP processor;
@Before
public void setup() {
processor = new GetTCP();
testRunner = TestRunners.newTestRunner(processor);
}
@Test
public void testSelectPropertiesValidation() {
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "!@;;*blah:9999");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999");
testRunner.assertValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:-1");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, ",");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, ",localhost:9999");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "999,localhost:123");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:abc_port");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999;localhost:1234");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999,localhost:1234");
testRunner.assertValid();
testRunner.setProperty(GetTCP.END_OF_MESSAGE_BYTE, "354");
testRunner.assertNotValid();
testRunner.setProperty(GetTCP.END_OF_MESSAGE_BYTE, "13");
testRunner.assertValid();
}
@Test
public void testDynamicProperty() {
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:9999,localhost:1234");
testRunner.setProperty("MyCustomProperty", "abc");
testRunner.assertValid();
}
@Test
public void testSuccessInteraction() throws Exception {
Server server = setupTCPServer(9999);
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + 9999);
testRunner.run(1000, false);
this.sendToSocket(new InetSocketAddress(9999), "Hello\r");
Thread.sleep(200);
testRunner.assertAllFlowFilesTransferred(GetTCP.REL_SUCCESS, 1);
testRunner.clearTransferState();
testRunner.shutdown();
server.stop();
}
@Test
public void testPartialInteraction() throws Exception {
Server server = setupTCPServer(9999);
testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + 9999);
testRunner.setProperty(GetTCP.RECEIVE_BUFFER_SIZE, "2");
testRunner.run(1000, false);
this.sendToSocket(new InetSocketAddress(9999), "Hello\r");
Thread.sleep(200);
testRunner.assertAllFlowFilesTransferred(GetTCP.REL_PARTIAL, 3);
testRunner.clearTransferState();
this.sendToSocket(new InetSocketAddress(9999), "H\r");
Thread.sleep(200);
testRunner.assertAllFlowFilesTransferred(GetTCP.REL_SUCCESS, 1);
testRunner.clearTransferState();
testRunner.shutdown();
server.stop();
}
private Server setupTCPServer(int port) {
InetSocketAddress address = new InetSocketAddress(port);
Server server = new Server(address, 1024, (byte) '\r');
server.start();
return server;
}
private void sendToSocket(InetSocketAddress address, String message) throws Exception {
Socket socket = new Socket(address.getAddress(), address.getPort());
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
out.write(message);
out.flush();
socket.close();
}
}

View File

@ -0,0 +1,8 @@
log4j.rootCategory=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
#log4j.category.com.hortonworks.minicluster=INFO
log4j.category.org.apache.tez.runtime=WARN

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-tcp-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-tcp-processors</module>
<module>nifi-tcp-nar</module>
</modules>
</project>

View File

@ -72,6 +72,7 @@
<module>nifi-email-bundle</module>
<module>nifi-ranger-bundle</module>
<module>nifi-websocket-bundle</module>
<module>nifi-tcp-bundle</module>
</modules>
<dependencyManagement>