NIFI-8792 - Modified ListenRELP to use Netty

- Refactored RELP encoders and decoders

This closes #5398

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2021-09-16 00:38:10 -04:00 committed by exceptionfactory
parent 8fe7f372d6
commit 309ab8f4ab
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
33 changed files with 1174 additions and 977 deletions

View File

@ -17,10 +17,13 @@
package org.apache.nifi.remote.io.socket;
import java.net.DatagramSocket;
import java.net.Socket;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.util.concurrent.Executors;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NetworkUtils {
@ -89,4 +92,20 @@ public class NetworkUtils {
return (result != null && result);
}
/**
* Get Interface Address using interface name eg. en0, eth0
*
* @param interfaceName Network Interface Name
* @return Interface Address or null when matching network interface name not found
* @throws SocketException Thrown when failing to get interface addresses
*/
public static InetAddress getInterfaceAddress(final String interfaceName) throws SocketException {
InetAddress interfaceAddress = null;
if (interfaceName != null && !interfaceName.isEmpty()) {
NetworkInterface networkInterface = NetworkInterface.getByName(interfaceName);
interfaceAddress = networkInterface.getInetAddresses().nextElement();
}
return interfaceAddress;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport;
/**
* An interface to represent network delivered event/messages
*/
public interface NetworkEvent {
/**
* @return the sending host of the data, as a socket
*/
String getSender();
/**
* @return raw data for this event
*/
byte[] getMessage();
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.event.transport.message;
import org.apache.nifi.event.transport.NetworkEvent;
/**
* Byte Array Message with Sender
*/
public class ByteArrayMessage {
public class ByteArrayMessage implements NetworkEvent {
private final byte[] message;
private final String sender;

View File

@ -16,11 +16,8 @@
*/
package org.apache.nifi.processor.util.listen;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -28,7 +25,6 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.event.Event;
import java.io.IOException;
@ -41,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
/**
* An abstract processor that extends from AbstractListenEventProcessor and adds common functionality for
* batching events into a single FlowFile.
@ -49,25 +47,6 @@ import java.util.Set;
*/
public abstract class AbstractListenEventBatchingProcessor<E extends Event> extends AbstractListenEventProcessor<E> {
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description(
"The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("1")
.required(true)
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.displayName("Batching Message Delimiter")
.description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
.required(true)
.build();
// it is only the array reference that is volatile - not the contents.
protected volatile byte[] messageDemarcatorBytes;
@ -80,8 +59,8 @@ public abstract class AbstractListenEventBatchingProcessor<E extends Event> exte
descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
descriptors.add(MAX_BATCH_SIZE);
descriptors.add(MESSAGE_DELIMITER);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
descriptors.addAll(getAdditionalProperties());
this.descriptors = Collections.unmodifiableList(descriptors);
@ -95,13 +74,13 @@ public abstract class AbstractListenEventBatchingProcessor<E extends Event> exte
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
final String msgDemarcator = context.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final int maxBatchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
// if the size is 0 then there was nothing to process so return
@ -169,7 +148,7 @@ public abstract class AbstractListenEventBatchingProcessor<E extends Event> exte
/**
* Batches together up to the batchSize events. Events are grouped together based on a batch key which
* by default is the sender of the event, but can be override by sub-classes.
* by default is the sender of the event, but can be overriden by sub-classes.
*
* This method will return when batchSize has been reached, or when no more events are available on the queue.
*

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processor.util.listen;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@ -30,10 +29,10 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
@ -56,8 +55,6 @@ import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_I
*/
public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor {
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port to listen on for communication.")
@ -178,19 +175,13 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
charset = Charset.forName(context.getProperty(CHARSET).getValue());
port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
final String interfaceName = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(interfaceName);
final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
InetAddress nicIPAddress = null;
if (!StringUtils.isEmpty(nicIPAddressStr)) {
NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
nicIPAddress = netIF.getInetAddresses().nextElement();
}
// create the dispatcher and call open() to bind to the given port
dispatcher = createDispatcher(context, events);
dispatcher.open(nicIPAddress, port, maxChannelBufferSize);
dispatcher.open(interfaceAddress, port, maxChannelBufferSize);
// start a thread to run the dispatcher
final Thread readerThread = new Thread(dispatcher);

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public abstract class EventBatcher<E extends ByteArrayMessage> {
public static final int POLL_TIMEOUT_MS = 20;
private volatile BlockingQueue<E> events;
private volatile BlockingQueue<E> errorEvents;
private final ComponentLog logger;
public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
this.logger = logger;
this.events = events;
this.errorEvents = errorEvents;
}
/**
* Batches together up to the batchSize events. Events are grouped together based on a batch key which
* by default is the sender of the event, but can be overriden by sub-classes.
* <p>
* This method will return when batchSize has been reached, or when no more events are available on the queue.
*
* @param session the current session
* @param totalBatchSize the total number of events to process
* @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
* @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
* the batches will be <= batchSize
*/
public Map<String, FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
final byte[] messageDemarcatorBytes) {
final Map<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
for (int i = 0; i < totalBatchSize; i++) {
final E event = getMessage(true, true, session);
if (event == null) {
break;
}
final String batchKey = getBatchKey(event);
FlowFileEventBatch batch = batches.get(batchKey);
// if we don't have a batch for this key then create a new one
if (batch == null) {
batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
batches.put(batchKey, batch);
}
// add the current event to the batch
batch.getEvents().add(event);
// append the event's data to the FlowFile, write the demarcator first if not on the first event
final boolean writeDemarcator = (i > 0);
try {
final byte[] rawMessage = event.getMessage();
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
}
});
// update the FlowFile reference in the batch object
batch.setFlowFile(appendedFlowFile);
} catch (final Exception e) {
logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
e.getMessage(), e);
errorEvents.offer(event);
break;
}
}
return batches;
}
/**
* The implementation should generate the indexing key for the event, to allow batching together related events.
* Typically the batch key will be the sender IP + port to allow batching events from the same sender into a single
* flow file.
* @param event Use information from the event to generate a batching key
* @return The key to batch like-kind events together eg. sender ID/socket
*/
protected abstract String getBatchKey(E event);
/**
* If pollErrorQueue is true, the error queue will be checked first and event will be
* returned from the error queue if available.
*
* If pollErrorQueue is false, or no data is in the error queue, the regular queue is polled.
*
* If longPoll is true, the regular queue will be polled with a short timeout, otherwise it will
* poll with no timeout which will return immediately.
*
* @param longPoll whether or not to poll the main queue with a small timeout
* @param pollErrorQueue whether or not to poll the error queue first
*
* @return an event from one of the queues, or null if none are available
*/
protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
E event = null;
if (pollErrorQueue) {
event = errorEvents.poll();
}
if (event != null) {
return event;
}
try {
if (longPoll) {
event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} else {
event = events.poll();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
if (event != null) {
session.adjustCounter("Messages Received", 1L, false);
}
return event;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen;
import org.apache.nifi.event.transport.NetworkEvent;
import org.apache.nifi.flowfile.FlowFile;
import java.util.List;
public final class FlowFileEventBatch<E extends NetworkEvent> {
private FlowFile flowFile;
private List<E> events;
public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) {
this.flowFile = flowFile;
this.events = events;
}
public FlowFile getFlowFile() {
return flowFile;
}
public List<E> getEvents() {
return events;
}
public void setFlowFile(FlowFile flowFile) {
this.flowFile = flowFile;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import java.net.NetworkInterface;
import java.net.SocketException;
@ -85,4 +86,70 @@ public class ListenerProperties {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port to listen on for communication.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the received data.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Receive Buffer Size")
.description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " +
"incoming messages.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("65507 B")
.required(true)
.build();
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Buffer")
.description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Message Queue")
.description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " +
"Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " +
"memory used by the processor during these surges.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.required(true)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Number of TCP Connections")
.description("The maximum number of concurrent TCP connections to accept.")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description(
"The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("1")
.required(true)
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.displayName("Batching Message Delimiter")
.description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
.required(true)
.build();
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.event;
import org.apache.nifi.event.transport.NetworkEvent;
import java.util.Map;
/**
* Factory to create instances of a given type of NettyEvent.
*/
public interface NetworkEventFactory<E extends NetworkEvent> {
/**
* Creates an event for the given data and metadata.
*
* @param data raw data from a channel
* @param metadata additional metadata
*
* @return an instance of the given type
*/
E create(final byte[] data, final Map<String, String> metadata);
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.event;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import java.util.Map;
/**
* An EventFactory implementation to create NettyEvents.
*/
public class StandardNetworkEventFactory implements NetworkEventFactory<ByteArrayMessage> {
@Override
public ByteArrayMessage create(final byte[] data, final Map<String, String> metadata) {
return new ByteArrayMessage(data, metadata.get(EventFactory.SENDER_KEY));
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public class EventBatcherTest {
final String MESSAGE_DATA_1 = "some message data";
final String MESSAGE_DATA_2 = "some more data";
Processor processor;
final AtomicLong idGenerator = new AtomicLong(0L);
final ComponentLog logger = mock(ComponentLog.class);
BlockingQueue events;
BlockingQueue errorEvents;
EventBatcher batcher;
MockProcessSession session;
StandardNetworkEventFactory eventFactory;
@Before
public void setUp() {
processor = new SimpleProcessor();
events = new LinkedBlockingQueue<>();
errorEvents = new LinkedBlockingQueue<>();
batcher = new EventBatcher<ByteArrayMessage>(logger, events, errorEvents) {
@Override
protected String getBatchKey(ByteArrayMessage event) {
return event.getSender();
}
};
session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
eventFactory = new StandardNetworkEventFactory();
}
@Test
public void testGetBatches() throws InterruptedException {
String sender1 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
String sender2 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
final Map<String, String> sender1Metadata = EventFactoryUtil.createMapWithSender(sender1);
final Map<String, String> sender2Metadata = EventFactoryUtil.createMapWithSender(sender2);
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
events.put(eventFactory.create(MESSAGE_DATA_2.getBytes(StandardCharsets.UTF_8), sender2Metadata));
events.put(eventFactory.create(MESSAGE_DATA_2.getBytes(StandardCharsets.UTF_8), sender2Metadata));
Map<String, FlowFileEventBatch> batches = batcher.getBatches(session, 100, "\n".getBytes(StandardCharsets.UTF_8));
assertEquals(2, batches.size());
assertEquals(4, batches.get(sender1).getEvents().size());
assertEquals(2, batches.get(sender2).getEvents().size());
}
public static class SimpleProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
}

View File

@ -413,6 +413,12 @@
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -24,44 +24,47 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.handler.RELPSocketChannelHandlerFactory;
import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.handler.RELPMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "relp", "tcp", "logs"})
@ -77,7 +80,7 @@ import java.util.concurrent.BlockingQueue;
@WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain")
})
@SeeAlso({ParseSyslog.class})
public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent> {
public class ListenRELP extends AbstractProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
@ -87,6 +90,7 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.displayName("Client Auth")
@ -96,27 +100,80 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
.defaultValue(ClientAuth.REQUIRED.name())
.build();
private volatile RELPEncoder relpEncoder;
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Messages received successfully will be sent out this relationship.")
.build();
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
protected List<PropertyDescriptor> descriptors;
protected Set<Relationship> relationships;
protected volatile int port;
protected volatile BlockingQueue<RELPMessage> events;
protected volatile BlockingQueue<RELPMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
protected volatile EventBatcher eventBatcher;
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
errorEvents = new LinkedBlockingQueue<>();
eventBatcher = getEventBatcher();
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
final NettyEventServerFactory eventFactory = getNettyEventServerFactory(hostname, port, charset, events);
eventFactory.setSocketReceiveBuffer(bufferSize);
eventFactory.setWorkerThreads(maxConnections);
configureFactoryForSsl(context, eventFactory);
try {
eventServer = eventFactory.getEventServer();
} catch (EventException e) {
getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
}
}
@OnStopped
public void stopped() {
if (eventServer != null) {
eventServer.shutdown();
eventServer = null;
}
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
// wanted to ensure charset was already populated here
relpEncoder = new RELPEncoder(charset);
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
descriptors.add(ListenerProperties.PORT);
descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
descriptors.add(ListenerProperties.MAX_CONNECTIONS);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
// Validate CLIENT_AUTH
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
@ -128,66 +185,31 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
public final Set<Relationship> getRelationships() {
return this.relationships;
}
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
ClientAuth clientAuth = null;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
private void configureFactoryForSsl(final ProcessContext context, final NettyEventServerFactory eventFactory) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createContext();
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events,
getLogger(), maxConnections, sslContext, clientAuth, charSet);
}
@Override
protected String getBatchKey(RELPEvent event) {
return event.getSender() + "_" + event.getCommand();
}
@Override
protected void postProcess(final ProcessContext context, final ProcessSession session, final List<RELPEvent> events) {
// first commit the session so we guarantee we have all the events successfully
// written to FlowFiles and transferred to the success relationship
session.commitAsync(() -> {
// respond to each event to acknowledge successful receipt
for (final RELPEvent event : events) {
respond(event, RELPResponse.ok(event.getTxnr()));
SSLContext sslContext = sslContextService.createContext();
if (sslContext != null) {
eventFactory.setSslContext(sslContext);
eventFactory.setClientAuth(ClientAuth.valueOf(clientAuthValue));
}
});
}
protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
final ChannelResponse response = new RELPChannelResponse(relpEncoder, relpResponse);
final ChannelResponder responder = event.getResponder();
responder.addResponse(response);
try {
responder.respond();
} catch (IOException e) {
getLogger().error("Error sending response for transaction {} due to {}",
new Object[] {event.getTxnr(), e.getMessage()}, e);
} else {
eventFactory.setSslContext(null);
}
}
@Override
protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
final List<RELPEvent> events = batch.getEvents();
final List<RELPMessage> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
@ -209,15 +231,63 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
return attributes;
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final List<RELPMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
EventBatcher eventBatcher = getEventBatcher();
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
Map<String, FlowFileEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
}
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<RELPMessage> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
continue;
}
final Map<String,String> attributes = getAttributes(entry.getValue());
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// the sender and command will be the same for all events based on the batch key
final String transitUri = getTransitUri(entry.getValue());
session.getProvenanceReporter().receive(flowFile, transitUri);
}
session.commitAsync();
}
private String getRELPBatchKey(final RELPMessage event) {
return event.getSender() + "_" + event.getCommand();
}
private EventBatcher getEventBatcher() {
return new EventBatcher<RELPMessage>(getLogger(), events, errorEvents) {
@Override
protected String getBatchKey(RELPMessage event) {
return getRELPBatchKey(event);
}
};
}
public enum RELPAttributes implements FlowFileAttributeKey {
TXNR("relp.txnr"),
COMMAND("relp.command"),
@ -235,4 +305,14 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
return key;
}
}
private NettyEventServerFactory getNettyEventServerFactory(final InetAddress hostname, final int port, final Charset charset, final BlockingQueue events) {
return new RELPMessageServerFactory(getLogger(), hostname, port, charset, events);
}
private String getMessageDemarcator(final ProcessContext context) {
return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
.getValue()
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
}

View File

@ -16,21 +16,18 @@
*/
package org.apache.nifi.processors.standard.relp.event;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.nio.channels.SocketChannel;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
/**
* A RELP event which adds the transaction number and command to the StandardEvent.
* A RELP message which adds a transaction number and command to the ByteArrayMessage.
*/
public class RELPEvent extends StandardEvent<SocketChannel> {
public class RELPMessage extends ByteArrayMessage {
private final long txnr;
private final String command;
public RELPEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final long txnr, final String command) {
super(sender, data, responder);
public RELPMessage(final String sender, final byte[] data, final long txnr, final String command) {
super(data, sender);
this.txnr = txnr;
this.command = command;
}
@ -42,5 +39,4 @@ public class RELPEvent extends StandardEvent<SocketChannel> {
public String getCommand() {
return command;
}
}

View File

@ -16,22 +16,20 @@
*/
package org.apache.nifi.processors.standard.relp.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
import java.util.Map;
/**
* An EventFactory implementation to create RELPEvents.
*/
public class RELPEventFactory implements EventFactory<RELPEvent> {
public class RELPMessageFactory implements NetworkEventFactory<RELPMessage> {
@Override
public RELPEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
public RELPMessage create(final byte[] data, final Map<String, String> metadata) {
final long txnr = Long.valueOf(metadata.get(RELPMetadata.TXNR_KEY));
final String command = metadata.get(RELPMetadata.COMMAND_KEY);
final String sender = metadata.get(EventFactory.SENDER_KEY);
return new RELPEvent(sender, data, responder, txnr, command);
final String sender = metadata.get(RELPMetadata.SENDER_KEY);
return new RELPMessage(sender, data, txnr, command);
}
}

View File

@ -23,5 +23,6 @@ public interface RELPMetadata {
String TXNR_KEY = "relp.txnr";
String COMMAND_KEY = "relp.command";
String SENDER_KEY = "sender";
}

View File

@ -16,11 +16,12 @@
*/
package org.apache.nifi.processors.standard.relp.frame;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
/**
* Decodes a RELP frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
@ -42,6 +43,13 @@ public class RELPDecoder {
this(charset, new ByteArrayOutputStream(4096));
}
/**
* @param charset the charset to decode bytes from the RELP frame
*/
public RELPDecoder(final Charset charset, final int bufferSize) {
this(charset, new ByteArrayOutputStream(bufferSize));
}
/**
*
* @param charset the charset to decode bytes from the RELP frame
@ -139,7 +147,7 @@ public class RELPDecoder {
if (b == RELPFrame.SEPARATOR) {
final String command = new String(currBytes.toByteArray(), charset);
frameBuilder.command(command);
logger.debug("Command is {}", new Object[] {command});
logger.debug("Command is {}", command);
currBytes.reset();
currState = RELPState.LENGTH;
@ -152,7 +160,7 @@ public class RELPDecoder {
if (b == RELPFrame.SEPARATOR || (currBytes.size() > 0 && b == RELPFrame.DELIMITER)) {
final int dataLength = Integer.parseInt(new String(currBytes.toByteArray(), charset));
frameBuilder.dataLength(dataLength);
logger.debug("Length is {}", new Object[] {dataLength});
logger.debug("Length is {}", dataLength);
currBytes.reset();

View File

@ -14,82 +14,82 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
package org.apache.nifi.processors.standard.relp.frame;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.standard.relp.event.RELPMessageFactory;
import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
/**
* Encapsulates the logic to handle a RELPFrame once it has been read from the channel.
* Decode RELP message bytes into a RELPMessage
*/
public class RELPFrameHandler<E extends Event<SocketChannel>> {
public class RELPFrameDecoder extends ByteToMessageDecoder {
private Charset charset;
private RELPDecoder decoder;
private final ComponentLog logger;
private final RELPEncoder encoder;
private final RELPMessageFactory eventFactory;
static final String CMD_OPEN = "open";
static final String CMD_CLOSE = "close";
private final Charset charset;
private final EventFactory<E> eventFactory;
private final EventQueue<E> events;
private final SelectionKey key;
private final AsyncChannelDispatcher dispatcher;
private final ComponentLog logger;
private final RELPEncoder encoder;
public RELPFrameHandler(final SelectionKey selectionKey,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final AsyncChannelDispatcher dispatcher,
final ComponentLog logger) {
this.key = selectionKey;
public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
this.charset = charset;
this.eventFactory = eventFactory;
this.dispatcher = dispatcher;
this.logger = logger;
this.events = new EventQueue<>(events, logger);
this.encoder = new RELPEncoder(charset);
this.eventFactory = new RELPMessageFactory();
}
public void handle(final RELPFrame frame, final ChannelResponder<SocketChannel> responder, final String sender)
throws IOException, InterruptedException {
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
final int total = in.readableBytes();
final String senderSocket = ctx.channel().remoteAddress().toString();
this.decoder = new RELPDecoder(charset, total);
// go through the buffer parsing the RELP command
for (int i = 0; i < total; i++) {
byte currByte = in.readByte();
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final RELPFrame frame = decoder.getFrame();
logger.debug("Received RELP frame with transaction {} and command {}",
frame.getTxnr(), frame.getCommand());
handle(frame, ctx, senderSocket, out);
in.markReaderIndex();
}
}
}
private void handle(final RELPFrame frame, final ChannelHandlerContext ctx, final String sender, final List<Object> out) {
// respond to open and close commands immediately, create and queue an event for everything else
if (CMD_OPEN.equals(frame.getCommand())) {
Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset);
ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers));
responder.addResponse(response);
responder.respond();
ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
} else if (CMD_CLOSE.equals(frame.getCommand())) {
ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr()));
responder.addResponse(response);
responder.respond();
dispatcher.completeConnection(key);
ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
ctx.close();
} else {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr()));
metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand());
final E event = eventFactory.create(frame.getData(), metadata, responder);
events.offer(event);
metadata.put(RELPMetadata.SENDER_KEY, sender);
out.add(eventFactory.create(frame.getData(), metadata));
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.frame;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Decode data received into a RELPMessage
*/
@ChannelHandler.Sharable
public class RELPMessageChannelHandler extends SimpleChannelInboundHandler<RELPMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(RELPMessageChannelHandler.class);
private final BlockingQueue<RELPMessage> events;
private final RELPEncoder encoder;
public RELPMessageChannelHandler(BlockingQueue<RELPMessage> events, final Charset charset) {
this.events = events;
this.encoder = new RELPEncoder(charset);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RELPMessage msg) {
LOGGER.debug("RELP Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
if (events.offer(msg)) {
LOGGER.debug("Event Queued: RELP Message Sender [{}] Transaction Number [{}]", msg.getSender(), msg.getTxnr());
ctx.writeAndFlush(Unpooled.wrappedBuffer(new RELPChannelResponse(encoder, RELPResponse.ok(msg.getTxnr())).toByteArray()));
} else {
LOGGER.debug("Event Queue Full: Failed RELP Message Sender [{}] Transaction Number [{}]", msg.getSender(), msg.getTxnr());
ctx.writeAndFlush(Unpooled.wrappedBuffer(new RELPChannelResponse(encoder, RELPResponse.serverFullError(msg.getTxnr())).toByteArray()));
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.frame;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import java.nio.charset.Charset;
import java.util.List;
/**
* Message encoder for a RELPResponse
*/
@ChannelHandler.Sharable
public class RELPResponseEncoder extends MessageToMessageEncoder<RELPResponse> {
private Charset charset;
public RELPResponseEncoder(final Charset charset) {
this.charset = charset;
}
@Override
protected void encode(ChannelHandlerContext context, RELPResponse event, List<Object> out) throws Exception {
out.add(new RELPEncoder(charset).encode(event.toFrame(charset)));
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.frame.RELPFrameDecoder;
import org.apache.nifi.processors.standard.relp.frame.RELPMessageChannelHandler;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
/**
* Netty Event Server Factory implementation for RELP Messages
*/
public class RELPMessageServerFactory extends NettyEventServerFactory {
/**
* RELP Message Server Factory to receive RELP messages
* @param log Component Log
* @param address Server Address
* @param port Server Port Number
* @param charset Charset to use when decoding RELP messages
* @param events Blocking Queue for events received
*/
public RELPMessageServerFactory(final ComponentLog log,
final InetAddress address,
final int port,
final Charset charset,
final BlockingQueue<RELPMessage> events) {
super(address, port, TransportProtocol.TCP);
final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
final RELPMessageChannelHandler relpChannelHandler = new RELPMessageChannelHandler(events, charset);
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new RELPFrameDecoder(log, charset),
relpChannelHandler
));
}
}

View File

@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
import org.apache.nifi.processors.standard.relp.frame.RELPDecoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.processors.standard.relp.frame.RELPFrameException;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* A RELP implementation of SSLSocketChannelHandler.
*/
public class RELPSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
private RELPDecoder decoder;
private RELPFrameHandler<E> frameHandler;
public RELPSSLSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new RELPDecoder(charset);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the RELP command
for (int i = 0; i < bytesRead; i++) {
byte currByte = buffer[i];
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final RELPFrame frame = decoder.getFrame();
logger.debug("Received RELP frame with transaction {} and command {}",
new Object[] {frame.getTxnr(), frame.getCommand()});
final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
logger.debug("Done processing buffer");
} catch (final RELPFrameException rfe) {
logger.error("Error reading RELP frames due to {}", new Object[] {rfe.getMessage()} , rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
}

View File

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.apache.nifi.processors.standard.relp.frame.RELPDecoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.processors.standard.relp.frame.RELPFrameException;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Extends the StandardSocketChannelHandler to decode bytes into RELP frames.
*/
public class RELPSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
private RELPDecoder decoder;
private RELPFrameHandler<E> frameHandler;
public RELPSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new RELPDecoder(charset);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer)
throws InterruptedException, IOException {
// get total bytes in buffer
final int total = socketBuffer.remaining();
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the RELP command
for (int i = 0; i < total; i++) {
byte currByte = socketBuffer.get();
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final RELPFrame frame = decoder.getFrame();
logger.debug("Received RELP frame with transaction {} and command {}",
new Object[] {frame.getTxnr(), frame.getCommand()});
final SocketChannelResponder responder = new SocketChannelResponder(socketChannel);
frameHandler.handle(frame, responder, sender.toString());
socketBuffer.mark();
}
}
logger.debug("Done processing buffer");
} catch (final RELPFrameException rfe) {
logger.error("Error reading RELP frames due to {}", new Object[] {rfe.getMessage()}, rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
// not used for anything in RELP since the decoder encapsulates the delimiter
@Override
public byte getDelimiter() {
return RELPFrame.DELIMITER;
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Default factory for creating RELP socket channel handlers.
*/
public class RELPSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new RELPSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new RELPSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
}

View File

@ -127,9 +127,23 @@ public class RELPResponse {
* @return a RELPResponse with a 500 code and a message of "ERROR"
*/
public static RELPResponse error(final long txnr) {
return new RELPResponse(txnr, ERROR, "ERROR", null);
return error(txnr, "ERROR");
}
/**
* Utility method to create a default "ERROR" response if the server event queue is full.
*
* @param txnr the transaction number being responded to
*
* @return a RELPResponse with a 500 code and a message of "SERVER FULL"
*/
public static RELPResponse serverFullError(final long txnr) {
return error(txnr, "SERVER FULL");
}
private static RELPResponse error(final long txnr, final String message) {
return new RELPResponse(txnr, ERROR, message, null);
}
/**
* Parses the provided data into a Map of offers.

View File

@ -16,22 +16,16 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.ftpserver.ssl.ClientAuth;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -45,32 +39,47 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TestListenRELP {
public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
public static final String RELP_FRAME_DATA = "this is a relp message here";
private static final String LOCALHOST = "localhost";
private static final Charset CHARSET = StandardCharsets.US_ASCII;
private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(10);
static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
.txnr(1)
.command("open")
.dataLength(OPEN_FRAME_DATA.length())
.data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
.data(OPEN_FRAME_DATA.getBytes(CHARSET))
.build();
static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
static final RELPFrame RELP_FRAME = new RELPFrame.Builder()
.txnr(2)
.command("syslog")
.dataLength(SYSLOG_FRAME_DATA.length())
.data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
.dataLength(RELP_FRAME_DATA.length())
.data(RELP_FRAME_DATA.getBytes(CHARSET))
.build();
static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
@ -80,14 +89,6 @@ public class TestListenRELP {
.data(new byte[0])
.build();
private static final String LOCALHOST = "localhost";
@Mock
private ChannelResponder<SocketChannel> responder;
@Mock
private ChannelDispatcher channelDispatcher;
@Mock
private RestrictedSSLContextService sslContextService;
@ -97,46 +98,53 @@ public class TestListenRELP {
@Before
public void setup() {
encoder = new RELPEncoder(StandardCharsets.UTF_8);
runner = TestRunners.newTestRunner(ListenRELP.class);
encoder = new RELPEncoder(CHARSET);
ListenRELP mockRELP = new MockListenRELP();
runner = TestRunners.newTestRunner(mockRELP);
}
@After
public void shutdown() {
runner.shutdown();
}
@Test
public void testRun() throws IOException {
final int syslogFrames = 5;
final List<RELPFrame> frames = getFrames(syslogFrames);
public void testRELPFramesAreReceivedSuccessfully() throws IOException {
final int relpFrames = 5;
final List<RELPFrame> frames = getFrames(relpFrames);
// three syslog frames should be transferred and three responses should be sent
run(frames, syslogFrames, syslogFrames, null);
// three RELP frames should be transferred
run(frames, relpFrames, null);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(syslogFrames, events.size());
Assert.assertEquals(relpFrames, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
Assert.assertEquals(syslogFrames, mockFlowFiles.size());
Assert.assertEquals(relpFrames, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
Assert.assertEquals(String.valueOf(RELP_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
Assert.assertEquals(RELP_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
}
@Test
public void testRunBatching() throws IOException {
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws IOException {
final int syslogFrames = 3;
final List<RELPFrame> frames = getFrames(syslogFrames);
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
// one syslog frame should be transferred since we are batching, but three responses should be sent
final int relpFrames = 3;
final List<RELPFrame> frames = getFrames(relpFrames);
// one relp frame should be transferred since we are batching
final int expectedFlowFiles = 1;
run(frames, expectedFlowFiles, syslogFrames, null);
run(frames, expectedFlowFiles, null);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
@ -150,117 +158,110 @@ public class TestListenRELP {
Assert.assertEquals(expectedFlowFiles, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
Assert.assertEquals(RELP_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
}
@Test
public void testRunMutualTls() throws IOException, TlsException, InitializationException {
final String serviceIdentifier = SSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(serviceIdentifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
final int syslogFrames = 3;
final List<RELPFrame> frames = getFrames(syslogFrames);
run(frames, syslogFrames, syslogFrames, sslContext);
}
@Test
public void testRunNoEventsAvailable() {
MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
runner.shutdown();
final int relpFrames = 3;
final List<RELPFrame> frames = getFrames(relpFrames);
run(frames, relpFrames, sslContext);
}
@Test
public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1";
final String sender2 = "sender2";
String sender1 = "/192.168.1.50:55000";
String sender2 = "/192.168.1.50:55001";
String sender3 = "/192.168.1.50:55002";
final List<RELPEvent> mockEvents = new ArrayList<>();
mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
final List<RELPMessage> mockEvents = new ArrayList<>();
mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
mockEvents.add(new RELPMessage(sender2, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
mockEvents.add(new RELPMessage(sender3, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
mockEvents.add(new RELPMessage(sender3, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
runner.shutdown();
}
private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext)
throws IOException {
final int port = NetworkUtils.availablePort();
runner.setProperty(ListenRELP.PORT, Integer.toString(port));
runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
// Run Processor and start Dispatcher without shutting down
runner.run(1, false, true);
try (final Socket socket = getSocket(port, sslContext)) {
final OutputStream outputStream = socket.getOutputStream();
sendFrames(frames, outputStream);
// Run Processor for number of responses
runner.run(responses, false, false);
runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
} finally {
runner.shutdown();
}
final byte[] relpMessages = getRELPMessages(frames);
sendMessages(port, relpMessages, sslContext);
runner.run(flowFiles, false, false);
runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
}
private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
private byte[] getRELPMessages(final List<RELPFrame> frames) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final RELPFrame frame : frames) {
final byte[] encodedFrame = encoder.encode(frame);
outputStream.write(encodedFrame);
outputStream.flush();
}
return outputStream.toByteArray();
}
private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
final Socket socket;
if (sslContext == null) {
socket = new Socket(LOCALHOST, port);
} else {
socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
}
return socket;
}
private List<RELPFrame> getFrames(final int syslogFrames) {
private List<RELPFrame> getFrames(final int relpFrames) {
final List<RELPFrame> frames = new ArrayList<>();
frames.add(OPEN_FRAME);
for (int i = 0; i < syslogFrames; i++) {
frames.add(SYSLOG_FRAME);
for (int i = 0; i < relpFrames; i++) {
frames.add(RELP_FRAME);
}
frames.add(CLOSE_FRAME);
return frames;
}
// Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events
private void sendMessages(final int port, final byte[] relpMessages, final SSLContext sslContext) {
final ByteArrayNettyEventSenderFactory eventSenderFactory = new ByteArrayNettyEventSenderFactory(runner.getLogger(), LOCALHOST, port, TransportProtocol.TCP);
if (sslContext != null) {
eventSenderFactory.setSslContext(sslContext);
}
eventSenderFactory.setTimeout(SENDER_TIMEOUT);
EventSender<byte[]> eventSender = eventSenderFactory.getEventSender();
eventSender.sendEvent(relpMessages);
}
private class MockListenRELP extends ListenRELP {
private final List<RELPMessage> mockEvents;
private final List<RELPEvent> mockEvents;
public MockListenRELP() {
this.mockEvents = new ArrayList<>();
}
public MockListenRELP(List<RELPEvent> mockEvents) {
public MockListenRELP(List<RELPMessage> mockEvents) {
this.mockEvents = mockEvents;
}
@ -270,12 +271,5 @@ public class TestListenRELP {
super.onScheduled(context);
events.addAll(mockEvents);
}
@Override
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) {
return channelDispatcher;
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth;
@ -94,7 +95,7 @@ public class TestListenTCP {
@Test
public void testRunBatching() throws IOException {
runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
@ -111,8 +112,8 @@ public class TestListenUDP {
@Test
public void testBatchingSingleSender() throws IOException, InterruptedException {
final String delimiter = "NN";
runner.setProperty(ListenUDP.MESSAGE_DELIMITER, delimiter);
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
runner.setProperty(ListenerProperties.MESSAGE_DELIMITER, delimiter);
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
final List<String> messages = getMessages(5);
final int expectedTransferred = 2;
@ -146,7 +147,7 @@ public class TestListenUDP {
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
runner.setProperty(ListenUDP.PORT, "1");
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
// sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
runner.run();
@ -162,7 +163,7 @@ public class TestListenUDP {
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
runner.setProperty(ListenUDP.PORT, "1");
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
runner.run(5);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.junit.Assert;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class TestRELPEventFactory {
@Test
public void testCreateRELPEvent() {
final byte[] data = "this is an event".getBytes(StandardCharsets.UTF_8);
final String sender = "sender1";
final long txnr = 1;
final String command = "syslog";
final Map<String,String> metadata = new HashMap<>();
metadata.put(EventFactory.SENDER_KEY, sender);
metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(txnr));
metadata.put(RELPMetadata.COMMAND_KEY, command);
final ChannelResponder responder = new SocketChannelResponder(null);
final EventFactory<RELPEvent> factory = new RELPEventFactory();
final RELPEvent event = factory.create(data, metadata, responder);
Assert.assertEquals(data, event.getData());
Assert.assertEquals(sender, event.getSender());
Assert.assertEquals(txnr, event.getTxnr());
Assert.assertEquals(command, event.getCommand());
Assert.assertEquals(responder, event.getResponder());
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.frame;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.util.MockComponentLog;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
class RELPFrameDecoderTest {
final ComponentLog logger = new MockComponentLog(this.getClass().getSimpleName(), this);
public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
.txnr(1)
.command("open")
.dataLength(OPEN_FRAME_DATA.length())
.data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
.build();
static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
.txnr(2)
.command("syslog")
.dataLength(SYSLOG_FRAME_DATA.length())
.data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
.build();
static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
.txnr(3)
.command("close")
.dataLength(0)
.data(new byte[0])
.build();
@Test
void testDecodeRELPEvents() throws IOException {
final List<RELPFrame> frames = getFrames(5);
ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer());
sendFrames(frames, eventBytes);
EmbeddedChannel channel = new EmbeddedChannel(new RELPFrameDecoder(logger, StandardCharsets.UTF_8));
assert(channel.writeInbound(eventBytes.buffer()));
assertEquals(5, channel.inboundMessages().size());
RELPMessage event = channel.readInbound();
assertEquals(RELPMessage.class, event.getClass());
assertEquals(SYSLOG_FRAME_DATA, new String(event.getMessage(), StandardCharsets.UTF_8));
assertEquals(2, channel.outboundMessages().size());
}
private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8);
for (final RELPFrame frame : frames) {
final byte[] encodedFrame = encoder.encode(frame);
outputStream.write(encodedFrame);
outputStream.flush();
}
}
private List<RELPFrame> getFrames(final int syslogFrames) {
final List<RELPFrame> frames = new ArrayList<>();
frames.add(OPEN_FRAME);
for (int i = 0; i < syslogFrames; i++) {
frames.add(SYSLOG_FRAME);
}
frames.add(CLOSE_FRAME);
return frames;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.frame;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import static org.junit.Assert.assertEquals;
class RELPResponseEncoderTest {
@Test
void testEncodeRELPResponse() throws IOException {
final byte[] relpResponse = new RELPChannelResponse(new RELPEncoder(Charset.defaultCharset()), RELPResponse.ok(321L)).toByteArray();
ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer(relpResponse.length));
eventBytes.write(relpResponse);
EmbeddedChannel channel = new EmbeddedChannel(new RELPResponseEncoder(Charset.defaultCharset()));
assert(channel.writeOutbound(eventBytes));
ByteBufOutputStream result = channel.readOutbound();
assertEquals("321 rsp 6 200 OK\n", new String(result.buffer().array()));
}
}

View File

@ -1,166 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestRELPFrameHandler {
private Charset charset;
private EventFactory<RELPEvent> eventFactory;
private BlockingQueue<RELPEvent> events;
private SelectionKey key;
private AsyncChannelDispatcher dispatcher;
private ComponentLog logger;
private RELPFrameHandler<RELPEvent> frameHandler;
@Before
public void setup() {
this.charset = StandardCharsets.UTF_8;
this.eventFactory = new RELPEventFactory();
this.events = new LinkedBlockingQueue<>();
this.key = Mockito.mock(SelectionKey.class);
this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
this.logger = Mockito.mock(ComponentLog.class);
this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Test
public void testOpen() throws IOException, InterruptedException {
final String offer1 = "relp_version=0";
final String offer2 = "relp_software=librelp,1.2.7,http://librelp.adiscon.com";
final String offer3 = "commands=syslog";
final String data = offer1 + "\n" + offer2 + "\n" + offer3;
final RELPFrame openFrame = new RELPFrame.Builder()
.txnr(1).command("open")
.dataLength(data.length())
.data(data.getBytes(charset))
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
Assert.assertEquals(1, responder.responded);
Assert.assertEquals(1, responder.responses.size());
// verify the response sent back the offers that were received
final ChannelResponse response = responder.responses.get(0);
final String responseData = new String(response.toByteArray(), charset);
Assert.assertTrue(responseData.contains(offer1));
Assert.assertTrue(responseData.contains(offer2));
Assert.assertTrue(responseData.contains(offer3));
}
@Test
public void testClose() throws IOException, InterruptedException {
final RELPFrame openFrame = new RELPFrame.Builder()
.txnr(1).command("close")
.dataLength(0)
.data(new byte[0])
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
Assert.assertEquals(1, responder.responded);
Assert.assertEquals(1, responder.responses.size());
// verify the response sent back the offers that were received
final ChannelResponse response = responder.responses.get(0);
final String responseData = new String(response.toByteArray(), charset);
Assert.assertTrue(responseData.contains("200 OK"));
}
@Test
public void testCommand() throws IOException, InterruptedException {
final String data = "this is a syslog message";
final RELPFrame openFrame = new RELPFrame.Builder()
.txnr(1).command("syslog")
.dataLength(data.length())
.data(data.getBytes(charset))
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
Assert.assertEquals(0, responder.responded);
Assert.assertEquals(0, responder.responses.size());
Assert.assertEquals(1, events.size());
final RELPEvent event = events.poll();
Assert.assertEquals(data, new String(event.getData(), charset));
}
private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
int responded;
List<ChannelResponse> responses = new ArrayList<>();
@Override
public SocketChannel getChannel() {
return Mockito.mock(SocketChannel.class);
}
@Override
public List<ChannelResponse> getResponses() {
return responses;
}
@Override
public void addResponse(ChannelResponse response) {
responses.add(response);
}
@Override
public void respond() throws IOException {
responded++;
}
}
}

View File

@ -1,209 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestRELPSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
private SSLContext sslContext;
private Charset charset;
private ChannelDispatcher dispatcher;
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}
@Test
public void testBasicHandling() throws IOException, InterruptedException {
final List<String> messages = new ArrayList<>();
messages.add("1 syslog 20 this is message 1234\n");
messages.add("2 syslog 22 this is message 456789\n");
messages.add("3 syslog 21 this is message ABCDE\n");
run(messages);
Assert.assertEquals(messages.size(), events.size());
boolean found1 = false;
boolean found2 = false;
boolean found3 = false;
TestEvent event;
while((event = events.poll()) != null) {
Map<String,String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(RELPMetadata.TXNR_KEY));
final String txnr = metadata.get(RELPMetadata.TXNR_KEY);
if (txnr.equals("1")) {
found1 = true;
} else if (txnr.equals("2")) {
found2 = true;
} else if (txnr.equals("3")) {
found3 = true;
}
}
Assert.assertTrue(found1);
Assert.assertTrue(found2);
Assert.assertTrue(found3);
}
@Test
public void testLotsOfFrames() throws IOException, InterruptedException {
final String baseMessage = " syslog 19 this is message ";
final List<String> messages = new ArrayList<>();
for (int i=100; i < 1000; i++) {
messages.add(i + baseMessage + i + "\n");
}
run(messages);
Assert.assertEquals(messages.size(), events.size());
}
protected void run(List<String> messages) throws IOException, InterruptedException {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);
dispatcherThread.start();
// create a client connection to the port the dispatcher is listening on
final int realPort = dispatcher.getPort();
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", realPort));
Thread.sleep(100);
// send the provided messages
for (int i=0; i < messages.size(); i++) {
buffer.clear();
buffer.put(messages.get(i).getBytes(charset));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(1);
}
}
// wait up to 25 seconds to verify the responses
long timeout = 25000;
long startTime = System.currentTimeMillis();
while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
Thread.sleep(100);
}
// should have gotten an event for each message sent
Assert.assertEquals(messages.size(), events.size());
} finally {
// stop the dispatcher thread and ensure we shut down handler threads
dispatcher.close();
}
}
// Test event to produce from the data
private static class TestEvent implements Event<SocketChannel> {
private byte[] data;
private Map<String,String> metadata;
public TestEvent(byte[] data, Map<String, String> metadata) {
this.data = data;
this.metadata = metadata;
}
@Override
public String getSender() {
return metadata.get(EventFactory.SENDER_KEY);
}
@Override
public byte[] getData() {
return data;
}
@Override
public ChannelResponder<SocketChannel> getResponder() {
return null;
}
}
// Factory to create test events and send responses for testing
private static class TestEventHolderFactory implements EventFactory<TestEvent> {
@Override
public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
return new TestEvent(data, metadata);
}
}
}