NIFI-8462 Refactored PutSyslog and ListenSyslog using Netty

- Added nifi-event-transport module encapsulating Netty classes
- Refactored unit tests for PutSyslog and ListenSyslog
- Removed integration tests for PutSyslog and ListenSyslog

NIFI-8462 Added context.yield() in PutSyslog when no FlowFiles and addressed other issues

NIFI-8462 Removed unused import of ExpressionLanguageScope

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5044.
This commit is contained in:
exceptionfactory 2021-04-13 12:15:45 -05:00 committed by Nathan Gough
parent 5108d7cdd0
commit a3365c8833
36 changed files with 2079 additions and 1783 deletions

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.remote.io.socket;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.concurrent.Executors;
@ -26,33 +26,49 @@ import java.util.concurrent.TimeUnit;
public class NetworkUtils {
/**
* Will determine the available port
* Get Available TCP Port
*
* @return Available TCP Port
*/
public final static int availablePort() {
ServerSocket s = null;
try {
s = new ServerSocket(0);
s.setReuseAddress(true);
return s.getLocalPort();
} catch (Exception e) {
throw new IllegalStateException("Failed to discover available port.", e);
} finally {
try {
s.close();
} catch (IOException e) {
// ignore
}
public static int availablePort() {
return getAvailableTcpPort();
}
/**
* Get Available TCP Port using ServerSocket
*
* @return Available TCP Port
*/
public static int getAvailableTcpPort() {
try (final ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
return socket.getLocalPort();
} catch (final Exception e) {
throw new IllegalArgumentException("Available TCP Port not found", e);
}
}
public final static boolean isListening(final String hostname, final int port) {
/**
* Get Available UDP Port using DatagramSocket
*
* @return Available UDP Port
*/
public static int getAvailableUdpPort() {
try (final DatagramSocket socket = new DatagramSocket()) {
return socket.getLocalPort();
} catch (final Exception e) {
throw new IllegalArgumentException("Available UDP Port not found", e);
}
}
public static boolean isListening(final String hostname, final int port) {
try (final Socket s = new Socket(hostname, port)) {
return s.isConnected();
} catch (final Exception ignore) {}
return false;
}
public final static boolean isListening(final String hostname, final int port, final int timeoutMillis) {
public static boolean isListening(final String hostname, final int port, final int timeoutMillis) {
Boolean result = false;
final ExecutorService executor = Executors.newSingleThreadExecutor();
@ -73,5 +89,4 @@ public class NetworkUtils {
return (result != null && result);
}
}

View File

@ -0,0 +1,43 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>1.14.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-event-transport</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.63.Final</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.14.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport;
/**
* Event Exception indicating issues when transporting events
*/
public class EventException extends RuntimeException {
/**
* Event Exception
*
* @param message Message
* @param cause Throwable cause
*/
public EventException(final String message, final Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport;
/**
* Event Sender
*
* @param <T> Event Type
*/
public interface EventSender<T> extends AutoCloseable {
/**
* Send Event
*
* @param event Event
*/
void sendEvent(T event);
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport;
/**
* Event Sender Factory
*
* @param <T> Event Type
*/
public interface EventSenderFactory<T> {
/**
* Get Event Sender
*
* @return Event Sender
*/
EventSender<T> getEventSender();
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport;
/**
* Event Server
*
*/
public interface EventServer {
/**
* Shutdown Event Server and close resources
*/
void shutdown();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport;
/**
* Event Server Factory
*
*/
public interface EventServerFactory {
/**
* Get Event Server
*
* @return Event Server
*/
EventServer getEventServer();
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.configuration;
/**
* Line Ending
*/
public enum LineEnding {
NONE,
UNIX
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.configuration;
/**
* Transport Protocol
*/
public enum TransportProtocol {
TCP,
UDP
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.message;
/**
* Byte Array Message with Sender
*/
public class ByteArrayMessage {
private final byte[] message;
private final String sender;
public ByteArrayMessage(final byte[] message, final String sender) {
this.message = message;
this.sender = sender;
}
public byte[] getMessage() {
return message;
}
public String getSender() {
return sender;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.channel.ByteArrayMessageChannelHandler;
import org.apache.nifi.event.transport.netty.codec.DatagramByteArrayMessageDecoder;
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.event.transport.netty.codec.SocketByteArrayMessageDecoder;
import org.apache.nifi.logging.ComponentLog;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
/**
* Netty Event Server Factory for Byte Array Messages
*/
public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFactory {
private static final boolean STRIP_DELIMITER = true;
/**
* Netty Event Server Factory with configurable delimiter and queue of Byte Array Messages
*
* @param log Component Log
* @param address Remote Address
* @param port Remote Port Number
* @param protocol Channel Protocol
* @param delimiter Message Delimiter
* @param maxFrameLength Maximum Frame Length for delimited TCP messages
* @param messages Blocking Queue for events received
*/
public ByteArrayMessageNettyEventServerFactory(final ComponentLog log,
final String address,
final int port,
final TransportProtocol protocol,
final byte[] delimiter,
final int maxFrameLength,
final BlockingQueue<ByteArrayMessage> messages) {
super(address, port, protocol);
final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
final ByteArrayMessageChannelHandler byteArrayMessageChannelHandler = new ByteArrayMessageChannelHandler(messages);
if (TransportProtocol.UDP.equals(protocol)) {
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new DatagramByteArrayMessageDecoder(),
byteArrayMessageChannelHandler
));
} else {
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new DelimiterBasedFrameDecoder(maxFrameLength, STRIP_DELIMITER, Unpooled.wrappedBuffer(delimiter)),
new ByteArrayDecoder(),
new SocketByteArrayMessageDecoder(),
byteArrayMessageChannelHandler
));
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
/**
* Event Loop Group Factory for standardized instance creation
*/
class EventLoopGroupFactory {
private static final String DEFAULT_THREAD_NAME_PREFIX = "NettyEventLoopGroup";
private static final boolean DAEMON_THREAD_ENABLED = true;
private String threadNamePrefix = DEFAULT_THREAD_NAME_PREFIX;
private int workerThreads;
/**
* Set Thread Name Prefix used in Netty NioEventLoopGroup defaults to NettyChannel
*
* @param threadNamePrefix Thread Name Prefix
*/
public void setThreadNamePrefix(final String threadNamePrefix) {
this.threadNamePrefix = Objects.requireNonNull(threadNamePrefix, "Thread Name Prefix required");
}
/**
* Set Worker Threads used in Netty NioEventLoopGroup with 0 interpreted as the default based on available processors
*
* @param workerThreads NioEventLoopGroup Worker Threads
*/
public void setWorkerThreads(final int workerThreads) {
this.workerThreads = workerThreads;
}
protected EventLoopGroup getEventLoopGroup() {
return new NioEventLoopGroup(workerThreads, getThreadFactory());
}
private ThreadFactory getThreadFactory() {
return new DefaultThreadFactory(threadNamePrefix, DAEMON_THREAD_ENABLED);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventSender;
import java.net.SocketAddress;
/**
* Netty Event Sender with Channel Pool
*
* @param <T> Event Type
*/
class NettyEventSender<T> implements EventSender<T> {
private final EventLoopGroup group;
private final ChannelPool channelPool;
private final SocketAddress remoteAddress;
/**
* Netty Channel Event Sender with Event Loop Group and Channel Pool
*
* @param group Event Loop Group
* @param channelPool Channel Pool
* @param remoteAddress Remote Address
*/
NettyEventSender(final EventLoopGroup group, final ChannelPool channelPool, final SocketAddress remoteAddress) {
this.group = group;
this.channelPool = channelPool;
this.remoteAddress = remoteAddress;
}
/**
* Send Event using Channel acquired from Channel Pool
*
* @param event Event
*/
@Override
public void sendEvent(final T event) {
try {
final Future<Channel> futureChannel = channelPool.acquire().sync();
final Channel channel = futureChannel.get();
try {
final ChannelFuture channelFuture = channel.writeAndFlush(event);
channelFuture.syncUninterruptibly();
} finally {
channelPool.release(channel);
}
} catch (final Exception e) {
throw new EventException(getChannelMessage("Send Failed"), e);
}
}
/**
* Close Channel Pool and Event Loop Group
*/
@Override
public void close() {
try {
channelPool.close();
} finally {
group.shutdownGracefully();
}
}
/**
* String representation includes Channel Remote Address
*
* @return String with Channel Remote Address
*/
@Override
public String toString() {
return getChannelMessage("Event Sender");
}
private String getChannelMessage(final String message) {
return String.format("%s Remote Address [%s]", message, remoteAddress);
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.EventSenderFactory;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
import org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer;
import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Netty Event Sender Factory
*/
public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements EventSenderFactory<T> {
private static final int MAX_PENDING_ACQUIRES = 1024;
private final String address;
private final int port;
private final TransportProtocol protocol;
private Duration timeout = Duration.ofSeconds(30);
private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
private SSLContext sslContext;
public NettyEventSenderFactory(final String address, final int port, final TransportProtocol protocol) {
this.address = address;
this.port = port;
this.protocol = protocol;
}
/**
* Set Channel Handler Supplier
*
* @param handlerSupplier Channel Handler Supplier
*/
public void setHandlerSupplier(final Supplier<List<ChannelHandler>> handlerSupplier) {
this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
}
/**
* Set SSL Context to enable TLS Channel Handler
*
* @param sslContext SSL Context
*/
public void setSslContext(final SSLContext sslContext) {
this.sslContext = sslContext;
}
/**
* Set Timeout for Connections and Communication
*
* @param timeout Timeout Duration
*/
public void setTimeout(final Duration timeout) {
this.timeout = Objects.requireNonNull(timeout, "Timeout required");
}
/**
* Set Maximum Connections for Channel Pool
*
* @param maxConnections Maximum Number of connections defaults to available processors multiplied by 2
*/
public void setMaxConnections(final int maxConnections) {
this.maxConnections = maxConnections;
}
/**
* Get Event Sender with connected Channel
*
* @return Connected Event Sender
*/
public EventSender<T> getEventSender() {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.remoteAddress(new InetSocketAddress(address, port));
final EventLoopGroup group = getEventLoopGroup();
bootstrap.group(group);
if (TransportProtocol.UDP.equals(protocol)) {
bootstrap.channel(NioDatagramChannel.class);
} else {
bootstrap.channel(NioSocketChannel.class);
}
setChannelOptions(bootstrap);
return getConfiguredEventSender(bootstrap);
}
private void setChannelOptions(final Bootstrap bootstrap) {
final int timeoutMilliseconds = (int) timeout.toMillis();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMilliseconds);
}
private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap) {
final SocketAddress remoteAddress = bootstrap.config().remoteAddress();
final ChannelPool channelPool = getChannelPool(bootstrap);
return new NettyEventSender<>(bootstrap.config().group(), channelPool, remoteAddress);
}
private ChannelPool getChannelPool(final Bootstrap bootstrap) {
final ChannelInitializer<Channel> channelInitializer = getChannelInitializer();
final ChannelPoolHandler handler = new InitializingChannelPoolHandler(channelInitializer);
return new FixedChannelPool(bootstrap,
handler,
ChannelHealthChecker.ACTIVE,
FixedChannelPool.AcquireTimeoutAction.FAIL,
timeout.toMillis(),
maxConnections,
MAX_PENDING_ACQUIRES);
}
private ChannelInitializer<Channel> getChannelInitializer() {
final StandardChannelInitializer<Channel> channelInitializer = sslContext == null
? new StandardChannelInitializer<>(handlerSupplier)
: new ClientSslStandardChannelInitializer<>(handlerSupplier, sslContext);
channelInitializer.setWriteTimeout(timeout);
return channelInitializer;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import org.apache.nifi.event.transport.EventServer;
/**
* Netty Event Server
*/
class NettyEventServer implements EventServer {
private final EventLoopGroup group;
private final Channel channel;
/**
* Netty Event Server with Event Loop Group and bound Channel
*
* @param group Event Loop Group
* @param channel Bound Channel
*/
NettyEventServer(final EventLoopGroup group, final Channel channel) {
this.group = group;
this.channel = channel;
}
/**
* Close Channel and shutdown Event Loop Group
*/
@Override
public void shutdown() {
try {
channel.close().syncUninterruptibly();
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.channel.ssl.ServerSslHandlerChannelInitializer;
import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Netty Event Server Factory
*/
public class NettyEventServerFactory extends EventLoopGroupFactory implements EventServerFactory {
private final String address;
private final int port;
private final TransportProtocol protocol;
private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
private Integer socketReceiveBuffer;
private SSLContext sslContext;
private ClientAuth clientAuth = ClientAuth.NONE;
public NettyEventServerFactory(final String address, final int port, final TransportProtocol protocol) {
this.address = address;
this.port = port;
this.protocol = protocol;
}
/**
* Set Channel Handler Supplier
*
* @param handlerSupplier Channel Handler Supplier
*/
public void setHandlerSupplier(final Supplier<List<ChannelHandler>> handlerSupplier) {
this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
}
/**
* Set Socket Receive Buffer Size for TCP Sockets
*
* @param socketReceiveBuffer Receive Buffer size can be null to use default setting
*/
public void setSocketReceiveBuffer(final Integer socketReceiveBuffer) {
this.socketReceiveBuffer = socketReceiveBuffer;
}
/**
* Set SSL Context to enable TLS Channel Handler
*
* @param sslContext SSL Context
*/
public void setSslContext(final SSLContext sslContext) {
this.sslContext = sslContext;
}
/**
* Set Client Authentication
*
* @param clientAuth Client Authentication
*/
public void setClientAuth(final ClientAuth clientAuth) {
this.clientAuth = clientAuth;
}
/**
* Get Event Server with Channel bound to configured address and port number
*
* @return Event Sender
*/
@Override
public EventServer getEventServer() {
final AbstractBootstrap<?, ?> bootstrap = getBootstrap();
final EventLoopGroup group = getEventLoopGroup();
bootstrap.group(group);
return getBoundEventServer(bootstrap, group);
}
private AbstractBootstrap<?, ?> getBootstrap() {
if (TransportProtocol.UDP.equals(protocol)) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
bootstrap.handler(new StandardChannelInitializer<>(handlerSupplier));
return bootstrap;
} else {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
if (sslContext == null) {
bootstrap.childHandler(new StandardChannelInitializer<>(handlerSupplier));
} else {
bootstrap.childHandler(new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth));
}
if (socketReceiveBuffer != null) {
bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
}
return bootstrap;
}
}
private EventServer getBoundEventServer(final AbstractBootstrap<?, ?> bootstrap, final EventLoopGroup group) {
final ChannelFuture bindFuture = bootstrap.bind(address, port);
try {
final ChannelFuture channelFuture = bindFuture.syncUninterruptibly();
return new NettyEventServer(group, channelFuture.channel());
} catch (final Exception e) {
group.shutdownGracefully();
throw new EventException(String.format("Channel Bind Failed [%s:%d]", address, port), e);
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringEncoder;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.logging.ComponentLog;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* Netty Event Sender Factory for String messages
*/
public class StringNettyEventSenderFactory extends NettyEventSenderFactory<String> {
/**
* Netty Event Sender Factory with configurable character set for String encoding
*
* @param log Component Log
* @param address Remote Address
* @param port Remote Port Number
* @param protocol Channel Protocol
* @param charset Character set for String encoding
* @param lineEnding Line Ending for optional encoding
*/
public StringNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol, final Charset charset, final LineEnding lineEnding) {
super(address, port, protocol);
final List<ChannelHandler> handlers = new ArrayList<>();
handlers.add(new LogExceptionChannelHandler(log));
handlers.add(new StringEncoder(charset));
if (LineEnding.UNIX.equals(lineEnding)) {
handlers.add(new LineEncoder(LineSeparator.UNIX, charset));
}
setHandlerSupplier(() -> handlers);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
/**
* Channel Handler for queuing bytes received as Byte Array Messages
*/
@ChannelHandler.Sharable
public class ByteArrayMessageChannelHandler extends SimpleChannelInboundHandler<ByteArrayMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayMessageChannelHandler.class);
private final BlockingQueue<ByteArrayMessage> messages;
public ByteArrayMessageChannelHandler(final BlockingQueue<ByteArrayMessage> messages) {
this.messages = messages;
}
/**
* Read Channel message and offer to queue for external processing
*
* @param channelHandlerContext Channel Handler Context
* @param message Byte Array Message for processing
*/
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ByteArrayMessage message) {
LOGGER.debug("Message Received Length [{}] Remote Address [{}] ", message.getMessage().length, message.getSender());
messages.add(message);
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.nifi.logging.ComponentLog;
import java.net.SocketAddress;
/**
* Log Exception Channel Handler for logging exceptions in absence of other handlers
*/
@ChannelHandler.Sharable
public class LogExceptionChannelHandler extends ChannelInboundHandlerAdapter {
private final ComponentLog log;
public LogExceptionChannelHandler(final ComponentLog log) {
this.log = log;
}
/**
* Log Exceptions caught during Channel handling
*
* @param context Channel Handler Context
* @param exception Exception
*/
@Override
public void exceptionCaught(final ChannelHandlerContext context, final Throwable exception) {
final SocketAddress remoteAddress = context.channel().remoteAddress();
log.warn("Communication Failed with Remote Address [{}]", remoteAddress, exception);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.channel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* Standard Channel Initializer
* @param <T> Channel Type
*/
public class StandardChannelInitializer<T extends Channel> extends ChannelInitializer<T> {
private final Supplier<List<ChannelHandler>> handlerSupplier;
private Duration writeTimeout = Duration.ofSeconds(30);
/**
* Standard Channel Initializer with handlers
*
* @param handlerSupplier Channel Handler Supplier
*/
public StandardChannelInitializer(final Supplier<List<ChannelHandler>> handlerSupplier) {
this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
}
/**
* Set Timeout for Write operations
*
* @param writeTimeout Write Timeout
*/
public void setWriteTimeout(final Duration writeTimeout) {
this.writeTimeout = Objects.requireNonNull(writeTimeout);
}
@Override
protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
handlerSupplier.get().forEach(pipeline::addLast);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.pool.AbstractChannelPoolHandler;
import java.util.Objects;
/**
* Initializing Channel Pool Handler adds Channel Initializer when a Channel is created
*/
public class InitializingChannelPoolHandler extends AbstractChannelPoolHandler {
private final ChannelInitializer<Channel> channelInitializer;
/**
* Initializing Channel Pool Handler
*
* @param channelInitializer Channel Initializer
*/
public InitializingChannelPoolHandler(final ChannelInitializer<Channel> channelInitializer) {
this.channelInitializer = Objects.requireNonNull(channelInitializer);
}
/**
* Connect Channel when created
*
* @param channel Channel to be connected
*/
@Override
public void channelCreated(final Channel channel) {
channel.pipeline().addLast(channelInitializer);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.channel.ssl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Client SSL Standard Channel Initializer supporting TLS with SSLContext configuration
* @param <T> Channel Type
*/
public class ClientSslStandardChannelInitializer<T extends Channel> extends StandardChannelInitializer<T> {
private final SSLContext sslContext;
/**
* Client SSL Channel Initializer with handlers and SSLContext
*
* @param handlerSupplier Channel Handler Supplier
* @param sslContext SSLContext
*/
public ClientSslStandardChannelInitializer(final Supplier<List<ChannelHandler>> handlerSupplier, final SSLContext sslContext) {
super(handlerSupplier);
this.sslContext = Objects.requireNonNull(sslContext, "SSLContext is required");
}
@Override
protected void initChannel(final Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(newSslHandler());
super.initChannel(channel);
}
private SslHandler newSslHandler() {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(true);
return new SslHandler(sslEngine);
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.channel.ssl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Server SslHandler Channel Initializer for configuring SslHandler with server parameters
* @param <T> Channel Type
*/
public class ServerSslHandlerChannelInitializer<T extends Channel> extends StandardChannelInitializer<T> {
private final SSLContext sslContext;
private final ClientAuth clientAuth;
/**
* Server SSL Channel Initializer with handlers and SSLContext
*
* @param handlerSupplier Channel Handler Supplier
* @param sslContext SSLContext
*/
public ServerSslHandlerChannelInitializer(final Supplier<List<ChannelHandler>> handlerSupplier, final SSLContext sslContext, final ClientAuth clientAuth) {
super(handlerSupplier);
this.sslContext = Objects.requireNonNull(sslContext, "SSLContext is required");
this.clientAuth = Objects.requireNonNull(clientAuth, "ClientAuth is required");
}
@Override
protected void initChannel(final Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(newSslHandler());
super.initChannel(channel);
}
private SslHandler newSslHandler() {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
if (ClientAuth.REQUIRED.equals(clientAuth)) {
sslEngine.setNeedClientAuth(true);
} else if (ClientAuth.WANT.equals(clientAuth)) {
sslEngine.setWantClientAuth(true);
}
return new SslHandler(sslEngine);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import java.net.InetAddress;
import java.util.List;
/**
* Message Decoder for bytes received from Datagram Packets
*/
public class DatagramByteArrayMessageDecoder extends MessageToMessageDecoder<DatagramPacket> {
/**
* Decode Datagram Packet to Byte Array Message
*
* @param channelHandlerContext Channel Handler Context
* @param datagramPacket Datagram Packet
* @param decoded Decoded Messages
*/
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final DatagramPacket datagramPacket, final List<Object> decoded) {
final ByteBuf content = datagramPacket.content();
final int readableBytes = content.readableBytes();
final byte[] bytes = new byte[readableBytes];
content.readBytes(bytes);
final InetAddress packetAddress = datagramPacket.sender().getAddress();
final String address = packetAddress.getHostAddress();
final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
decoded.add(message);
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import java.net.InetSocketAddress;
import java.util.List;
/**
* Message Decoder for bytes received from Socket Channels
*/
public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[]> {
/**
* Decode bytes to Byte Array Message with remote address from Channel.remoteAddress()
*
* @param channelHandlerContext Channel Handler Context
* @param bytes Message Bytes
* @param decoded Decoded Messages
*/
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
final String address = remoteAddress.getHostString();
final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
decoded.add(message);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.junit.Test;
import javax.net.ssl.SSLContext;
import java.time.Duration;
import static org.junit.Assert.assertThrows;
public class NettyEventSenderFactoryTest {
private static final String ADDRESS = "127.0.0.1";
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(1);
private static final int SINGLE_THREAD = 1;
@Test
public void testSendEventTcpException() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
final NettyEventSenderFactory<ByteBuf> factory = new NettyEventSenderFactory<>(ADDRESS, port, TransportProtocol.TCP);
factory.setTimeout(DEFAULT_TIMEOUT);
factory.setWorkerThreads(SINGLE_THREAD);
factory.setThreadNamePrefix(NettyEventSenderFactoryTest.class.getSimpleName());
final SSLContext sslContext = SSLContext.getDefault();
factory.setSslContext(sslContext);
try (final EventSender<ByteBuf> eventSender = factory.getEventSender()) {
assertThrows(EventException.class, () -> eventSender.sendEvent(ByteBufAllocator.DEFAULT.buffer()));
}
}
@Test
public void testSendEventCloseUdp() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final NettyEventSenderFactory<ByteBuf> factory = new NettyEventSenderFactory<>(ADDRESS, port, TransportProtocol.UDP);
factory.setTimeout(DEFAULT_TIMEOUT);
factory.setWorkerThreads(SINGLE_THREAD);
final EventSender<ByteBuf> eventSender = factory.getEventSender();
eventSender.sendEvent(ByteBufAllocator.DEFAULT.buffer());
eventSender.close();
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TlsConfiguration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
@RunWith(MockitoJUnitRunner.class)
public class StringNettyEventSenderFactoryTest {
private static final String ADDRESS = "127.0.0.1";
private static final int MAX_FRAME_LENGTH = 1024;
private static final long TIMEOUT_SECONDS = 5;
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(TIMEOUT_SECONDS);
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final String MESSAGE = String.class.getName();
private static final String DELIMITER = "\n";
private static final int SINGLE_THREAD = 1;
@Mock
private ComponentLog log;
@Test
public void testSendEventTcpEventException() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>();
final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages);
final EventServer eventServer = serverFactory.getEventServer();
final NettyEventSenderFactory<String> senderFactory = getEventSenderFactory(port);
try (final EventSender<String> sender = senderFactory.getEventSender()) {
eventServer.shutdown();
assertThrows(EventException.class, () -> sender.sendEvent(MESSAGE));
} finally {
eventServer.shutdown();
}
}
@Test
public void testSendEventTcp() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>();
final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages);
final EventServer eventServer = serverFactory.getEventServer();
final NettyEventSenderFactory<String> senderFactory = getEventSenderFactory(port);
try (final EventSender<String> sender = senderFactory.getEventSender()) {
sender.sendEvent(MESSAGE);
} finally {
eventServer.shutdown();
}
assertMessageReceived(messages);
}
@Test
public void testSendEventTcpSslContextConfigured() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
final NettyEventSenderFactory<String> senderFactory = getEventSenderFactory(port);
final SSLContext sslContext = getSslContext();
senderFactory.setSslContext(sslContext);
final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>();
final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages);
serverFactory.setSslContext(sslContext);
serverFactory.setClientAuth(ClientAuth.NONE);
final EventServer eventServer = serverFactory.getEventServer();
try (final EventSender<String> eventSender = senderFactory.getEventSender()) {
eventSender.sendEvent(MESSAGE);
} finally {
eventServer.shutdown();
}
assertMessageReceived(messages);
}
private void assertMessageReceived(final BlockingQueue<ByteArrayMessage> messages) throws InterruptedException {
final ByteArrayMessage messageReceived = messages.poll(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNotNull("Message not received", messageReceived);
final String eventReceived = new String(messageReceived.getMessage(), CHARSET);
assertEquals("Message not matched", MESSAGE, eventReceived);
assertEquals("Sender not matched", ADDRESS, messageReceived.getSender());
}
private NettyEventSenderFactory<String> getEventSenderFactory(final int port) {
final StringNettyEventSenderFactory senderFactory = new StringNettyEventSenderFactory(log,
ADDRESS, port, TransportProtocol.TCP, CHARSET, LineEnding.UNIX);
senderFactory.setTimeout(DEFAULT_TIMEOUT);
return senderFactory;
}
private NettyEventServerFactory getEventServerFactory(final int port, final BlockingQueue<ByteArrayMessage> messages) {
final ByteArrayMessageNettyEventServerFactory factory = new ByteArrayMessageNettyEventServerFactory(log,
ADDRESS, port, TransportProtocol.TCP, DELIMITER.getBytes(), MAX_FRAME_LENGTH, messages);
factory.setWorkerThreads(SINGLE_THREAD);
return factory;
}
private SSLContext getSslContext() throws GeneralSecurityException, IOException {
final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
final File keystoreFile = new File(tlsConfiguration.getKeystorePath());
keystoreFile.deleteOnExit();
final File truststoreFile = new File(tlsConfiguration.getTruststorePath());
truststoreFile.deleteOnExit();
return SslContextFactory.createSslContext(tlsConfiguration);
}
}

View File

@ -37,6 +37,7 @@
<module>nifi-service-utils</module>
<module>nifi-prometheus-utils</module>
<module>nifi-kerberos-test-utils</module>
<module>nifi-event-transport</module>
</modules>
</project>

View File

@ -39,6 +39,11 @@
<artifactId>nifi-processor-utils</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-syslog-utils</artifactId>

View File

@ -25,10 +25,15 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
@ -37,19 +42,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
@ -59,11 +52,8 @@ import org.apache.nifi.syslog.parsers.SyslogParser;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
@ -136,6 +126,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Number of TCP Connections")
@ -144,6 +135,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
@ -152,7 +144,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
"The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
.required(true)
.build();
@ -180,6 +171,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
"messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
@ -188,6 +180,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.required(false)
.allowableValues(ClientAuth.values())
.defaultValue(ClientAuth.REQUIRED.name())
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -199,14 +192,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.")
.build();
protected static final String RECEIVED_COUNTER = "Messages Received";
protected static final String SUCCESS_COUNTER = "FlowFiles Transferred to Success";
private static final String DEFAULT_ADDRESS = "127.0.0.1";
private static final String DEFAULT_MIME_TYPE = "text/plain";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private volatile ChannelDispatcher channelDispatcher;
private volatile EventServer eventServer;
private volatile SyslogParser parser;
private volatile ByteBufferSource byteBufferSource;
private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
private final BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>();
private volatile BlockingQueue<ByteArrayMessage> syslogEvents = new LinkedBlockingQueue<>();
private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents.
@Override
@ -248,12 +244,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// if we are changing the protocol, the events that we may have queued up are no longer valid, as they
// were received using a different protocol and may be from a completely different source
if (PROTOCOL.equals(descriptor)) {
if (syslogEvents != null) {
syslogEvents.clear();
}
if (errorEvents != null) {
errorEvents.clear();
}
syslogEvents.clear();
}
}
@ -274,134 +265,62 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.valid(false).subject("SSL Context").build());
}
// Validate CLIENT_AUTH
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
.explanation("Client Auth must be provided when using TLS/SSL")
.valid(false).subject("Client Auth").build());
}
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int receiveBufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxSocketBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final String charSet = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
final TransportProtocol protocol = TransportProtocol.valueOf(context.getProperty(PROTOCOL).getValue());
final String networkInterfaceName = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue());
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charSet));
final int maxConnections;
if (UDP_VALUE.getValue().equals(protocol)) {
maxConnections = 1;
} else {
maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
parser = new SyslogParser(Charset.forName(charSet));
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
parser = new SyslogParser(charset);
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
InetAddress nicIPAddress = null;
if (!StringUtils.isEmpty(nicIPAddressStr)) {
NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
nicIPAddress = netIF.getInetAddresses().nextElement();
String address = DEFAULT_ADDRESS;
if (StringUtils.isNotEmpty(networkInterfaceName)) {
final NetworkInterface networkInterface = NetworkInterface.getByName(networkInterfaceName);
final InetAddress interfaceAddress = networkInterface.getInetAddresses().nextElement();
address = interfaceAddress.getHostName();
}
// create either a UDP or TCP reader and call open() to bind to the given port
final ByteArrayMessageNettyEventServerFactory factory = new ByteArrayMessageNettyEventServerFactory(getLogger(),
address,port, protocol, messageDemarcatorBytes, receiveBufferSize, syslogEvents);
factory.setThreadNamePrefix(String.format("%s[%s]", ListenSyslog.class.getSimpleName(), getIdentifier()));
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
factory.setWorkerThreads(maxConnections);
factory.setSocketReceiveBuffer(maxSocketBufferSize);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
channelDispatcher = createChannelReader(context, protocol, byteBufferSource, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelDispatcher);
readerThread.setName("ListenSyslog [" + getIdentifier() + "]");
readerThread.setDaemon(true);
readerThread.start();
}
// visible for testing.
protected SyslogParser getParser() {
return parser;
}
// visible for testing to be overridden and provide a mock ChannelDispatcher if desired
protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final ByteBufferSource byteBufferSource,
final BlockingQueue<RawSyslogEvent> events, final int maxConnections,
final SSLContextService sslContextService, final Charset charset) throws IOException {
final EventFactory<RawSyslogEvent> eventFactory = new RawSyslogEventFactory();
if (UDP_VALUE.getValue().equals(protocol)) {
return new DatagramChannelDispatcher(eventFactory, byteBufferSource, events, getLogger());
} else {
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
ClientAuth clientAuth = null;
if (sslContextService != null) {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createContext();
clientAuth = ClientAuth.valueOf(clientAuthValue);
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createContext();
ClientAuth clientAuth = ClientAuth.REQUIRED;
final PropertyValue clientAuthProperty = context.getProperty(CLIENT_AUTH);
if (clientAuthProperty.isSet()) {
clientAuth = ClientAuth.valueOf(clientAuthProperty.getValue());
}
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth, charset);
factory.setSslContext(sslContext);
factory.setClientAuth(clientAuth);
}
eventServer = factory.getEventServer();
}
// used for testing to access the random port that was selected
protected int getPort() {
return channelDispatcher == null ? 0 : channelDispatcher.getPort();
}
@OnUnscheduled
public void onUnscheduled() {
if (channelDispatcher != null) {
channelDispatcher.close();
@OnStopped
public void shutdownEventServer() {
if (eventServer != null) {
eventServer.shutdown();
}
}
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
RawSyslogEvent rawSyslogEvent = null;
if (pollErrorQueue) {
rawSyslogEvent = errorEvents.poll();
}
if (rawSyslogEvent == null) {
try {
if (longPoll) {
rawSyslogEvent = syslogEvents.poll(20, TimeUnit.MILLISECONDS);
} else {
rawSyslogEvent = syslogEvents.poll();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
if (rawSyslogEvent != null) {
session.adjustCounter("Messages Received", 1L, false);
}
return rawSyslogEvent;
}
protected int getErrorQueueSize() {
return errorEvents.size();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// poll the queue with a small timeout to avoid unnecessarily yielding below
RawSyslogEvent rawSyslogEvent = getMessage(true, true, session);
ByteArrayMessage rawSyslogEvent = getMessage(session);
// if nothing in the queue just return, we don't want to yield here because yielding could adversely
// impact performance, and we already have a long poll in getMessage so there will be some built in
@ -410,30 +329,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
return;
}
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String, String> defaultAttributes = new HashMap<>(4);
defaultAttributes.put(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol);
defaultAttributes.put(SyslogAttributes.SYSLOG_PORT.key(), port);
defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
final int numAttributes = SyslogAttributes.values().length + 2;
final boolean shouldParse = context.getProperty(PARSE_MESSAGES).asBoolean();
final boolean parseMessages = context.getProperty(PARSE_MESSAGES).asBoolean();
final Map<String, FlowFile> flowFilePerSender = new HashMap<>();
final SyslogParser parser = getParser();
final Map<String, String> defaultAttributes = getDefaultAttributes(context);
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
for (int i = 0; i < maxBatchSize; i++) {
SyslogEvent event = null;
// If this is our first iteration, we have already polled our queues. Otherwise, poll on each iteration.
if (i > 0) {
rawSyslogEvent = getMessage(true, false, session);
rawSyslogEvent = getMessage(session);
if (rawSyslogEvent == null) {
break;
}
@ -441,153 +347,115 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final String sender = rawSyslogEvent.getSender();
FlowFile flowFile = flowFilePerSender.computeIfAbsent(sender, k -> session.create());
flowFile = session.putAllAttributes(flowFile, defaultAttributes);
flowFile = session.putAttribute(flowFile, SyslogAttributes.SYSLOG_SENDER.key(), sender);
if (shouldParse) {
boolean valid = true;
try {
event = parser.parseEvent(rawSyslogEvent.getData(), sender);
} catch (final ProcessException pe) {
getLogger().warn("Failed to parse Syslog event; routing to invalid");
valid = false;
}
if (parseMessages) {
event = parseSyslogEvent(rawSyslogEvent);
// If the event is invalid, route it to 'invalid' and then stop.
// We create a separate FlowFile for this case instead of using 'flowFile',
// because the 'flowFile' object may already have data written to it.
if (!valid || event == null || !event.isValid()) {
if (event == null || !event.isValid()) {
FlowFile invalidFlowFile = session.create();
invalidFlowFile = session.putAllAttributes(invalidFlowFile, defaultAttributes);
if (sender != null) {
invalidFlowFile = session.putAttribute(invalidFlowFile, SyslogAttributes.SYSLOG_SENDER.key(), sender);
}
invalidFlowFile = session.putAttribute(invalidFlowFile, SyslogAttributes.SYSLOG_SENDER.key(), sender);
try {
final byte[] rawBytes = rawSyslogEvent.getData();
invalidFlowFile = session.write(invalidFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(rawBytes);
}
});
} catch (final Exception e) {
getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", e);
errorEvents.offer(rawSyslogEvent);
session.remove(invalidFlowFile);
break;
}
final byte[] messageBytes = rawSyslogEvent.getMessage();
invalidFlowFile = session.write(invalidFlowFile, outputStream -> outputStream.write(messageBytes));
session.transfer(invalidFlowFile, REL_INVALID);
break;
}
getLogger().trace(event.getFullMessage());
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(SyslogAttributes.SYSLOG_PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SYSLOG_SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.SYSLOG_FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.SYSLOG_VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.SYSLOG_TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.SYSLOG_HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.SYSLOG_BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.SYSLOG_VALID.key(), String.valueOf(event.isValid()));
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.putAllAttributes(flowFile, getEventAttributes(event));
}
// figure out if we should write the bytes from the raw event or parsed event
final boolean writeDemarcator = (i > 0);
try {
// write the raw bytes of the message as the FlowFile content
final byte[] rawMessage = (event == null) ? rawSyslogEvent.getData() : event.getRawMessage();
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
}
});
} catch (final Exception e) {
getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", e);
errorEvents.offer(rawSyslogEvent);
break;
}
final byte[] messageBytes = (event == null) ? rawSyslogEvent.getMessage() : event.getRawMessage();
flowFile = session.append(flowFile, outputStream -> {
if (writeDemarcator) {
outputStream.write(messageDemarcatorBytes);
}
outputStream.write(messageBytes);
});
flowFilePerSender.put(sender, flowFile);
}
for (final Map.Entry<String, FlowFile> entry : flowFilePerSender.entrySet()) {
final String sender = entry.getKey();
FlowFile flowFile = entry.getValue();
final FlowFile flowFile = entry.getValue();
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from Sender {}; removing FlowFile", new Object[] {sender});
getLogger().debug("Removing empty {} from Sender [{}]", flowFile, sender);
continue;
}
final Map<String, String> newAttributes = new HashMap<>(defaultAttributes.size() + 1);
newAttributes.putAll(defaultAttributes);
newAttributes.put(SyslogAttributes.SYSLOG_SENDER.key(), sender);
flowFile = session.putAllAttributes(flowFile, newAttributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
session.adjustCounter(SUCCESS_COUNTER, 1L, false);
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString();
final String transitUri = getTransitUri(flowFile);
session.getProvenanceReporter().receive(flowFile, transitUri);
}
}
/**
* Wrapper class to pass around the raw message and the host/ip that sent it
*/
static class RawSyslogEvent<C extends SelectableChannel> implements Event<C> {
final byte[] rawMessage;
final String sender;
public RawSyslogEvent(final byte[] rawMessage, final String sender) {
this.rawMessage = rawMessage;
this.sender = sender;
}
@Override
public byte[] getData() {
return this.rawMessage;
}
@Override
public String getSender() {
return this.sender;
}
@Override
public ChannelResponder getResponder() {
return null;
private SyslogEvent parseSyslogEvent(final ByteArrayMessage rawSyslogEvent) {
final String sender = rawSyslogEvent.getSender();
final byte[] message = rawSyslogEvent.getMessage();
SyslogEvent syslogEvent = null;
try {
syslogEvent = parser.parseEvent(message, rawSyslogEvent.getSender());
} catch (final RuntimeException e) {
getLogger().warn("Syslog Parsing Failed Length [{}] Sender [{}]: {}", message.length, sender, e.getMessage());
}
return syslogEvent;
}
/**
* EventFactory implementation for RawSyslogEvent.
*/
private static class RawSyslogEventFactory implements EventFactory<RawSyslogEvent> {
@Override
public RawSyslogEvent create(byte[] data, Map<String, String> metadata, final ChannelResponder responder) {
String sender = null;
if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) {
sender = metadata.get(EventFactory.SENDER_KEY);
private ByteArrayMessage getMessage(final ProcessSession session) {
ByteArrayMessage rawSyslogEvent = null;
try {
rawSyslogEvent = syslogEvents.poll(20, TimeUnit.MILLISECONDS);
if (rawSyslogEvent != null) {
session.adjustCounter(RECEIVED_COUNTER, 1L, false);
}
return new RawSyslogEvent(data, sender);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
return rawSyslogEvent;
}
private String getTransitUri(final FlowFile flowFile) {
final String protocol = flowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key());
final String sender = flowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key());
final String port = flowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key());
return String.format("%s://%s:%s", protocol.toLowerCase(), sender, port);
}
private Map<String, String> getDefaultAttributes(final ProcessContext context) {
final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String, String> defaultAttributes = new HashMap<>();
defaultAttributes.put(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol);
defaultAttributes.put(SyslogAttributes.SYSLOG_PORT.key(), port);
defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), DEFAULT_MIME_TYPE);
return defaultAttributes;
}
private Map<String, String> getEventAttributes(final SyslogEvent event) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(SyslogAttributes.SYSLOG_PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SYSLOG_SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.SYSLOG_FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.SYSLOG_VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.SYSLOG_TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.SYSLOG_HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.SYSLOG_BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.SYSLOG_VALID.key(), String.valueOf(event.isValid()));
return attributes;
}
}

View File

@ -16,21 +16,20 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -39,21 +38,21 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.netty.StringNettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.StopWatch;
@ -73,7 +72,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the Syslog server. Note that Expression language is not evaluated per FlowFile.")
.description("The IP address or hostname of the Syslog server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
@ -84,7 +83,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped. Note that Expression language is not evaluated per FlowFile.")
"the data can be read, and incoming data will be dropped.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
@ -92,7 +91,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
.Builder().name("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor. Note that Expression language is not evaluated per FlowFile.")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("25")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -100,7 +99,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection. Note that Expression language is not evaluated per FlowFile.")
.description("The amount of time a connection should be held open without being used before closing the connection.")
.required(true)
.defaultValue("5 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@ -150,6 +149,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
"messages will be sent over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -167,7 +167,9 @@ public class PutSyslog extends AbstractSyslogProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private volatile BlockingQueue<ChannelSender> senderPool;
private EventSender<String> eventSender;
private String transitUri;
@Override
protected void init(final ProcessorInitializationContext context) {
@ -222,213 +224,101 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
// initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
}
protected ChannelSender createSender(final ProcessContext context) throws IOException {
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
public void onScheduled(final ProcessContext context) throws InterruptedException {
eventSender = getEventSender(context);
final String protocol = context.getProperty(PROTOCOL).getValue();
final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue();
final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
return createSender(sslContextService, protocol, host, port, maxSendBuffer, timeout);
}
// visible for testing to override and provide a mock sender if desired
protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host,
final int port, final int maxSendBufferSize, final int timeout)
throws IOException {
ChannelSender sender;
if (protocol.equals(UDP_VALUE.getValue())) {
sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger());
} else {
// if an SSLContextService is provided then we make a secure sender
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createContext();
sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger());
} else {
sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger());
}
}
sender.setTimeout(timeout);
sender.open();
return sender;
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
transitUri = String.format("%s://%s:%s", protocol, hostname, port);
}
@OnStopped
public void onStopped() {
if (senderPool != null) {
ChannelSender sender = senderPool.poll();
while (sender != null) {
sender.close();
sender = senderPool.poll();
}
public void onStopped() throws Exception {
if (eventSender != null) {
eventSender.close();
}
}
private PruneResult pruneIdleSenders(final long idleThreshold){
int numClosed = 0;
int numConsidered = 0;
long currentTime = System.currentTimeMillis();
final List<ChannelSender> putBack = new ArrayList<>();
// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
numConsidered++;
if (currentTime > (sender.getLastUsed() + idleThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
numClosed++;
} else {
putBack.add(sender);
}
}
// re-queue senders that weren't idle, but if the queue is full then close the sender
for (ChannelSender putBackSender : putBack) {
boolean returned = senderPool.offer(putBackSender);
if (!returned) {
putBackSender.close();
}
}
return new PruneResult(numClosed, numConsidered);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final String protocol = context.getProperty(PROTOCOL).getValue();
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.isEmpty()) {
final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue());
// yield if we closed an idle connection, or if there were no connections in the first place
if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
context.yield();
}
return;
}
// get a sender from the pool, or create a new one if the pool is empty
// if we can't create a new connection then route flow files to failure and yield
ChannelSender sender = senderPool.poll();
if (sender == null) {
try {
getLogger().debug("No available connections, creating a new one...");
sender = createSender(context);
} catch (IOException e) {
for (final FlowFile flowFile : flowFiles) {
getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
new Object[]{flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
}
context.yield();
return;
}
}
final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
final AtomicReference<IOException> exceptionHolder = new AtomicReference<>(null);
final Charset charSet = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue());
try {
for (FlowFile flowFile : flowFiles) {
if (flowFiles.isEmpty()) {
context.yield();
} else {
for (final FlowFile flowFile : flowFiles) {
final StopWatch timer = new StopWatch(true);
final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("<").append(priority).append(">");
if (version != null) {
messageBuilder.append(version).append(" ");
}
messageBuilder.append(timestamp).append(" ").append(hostname).append(" ").append(body);
final String fullMessage = messageBuilder.toString();
getLogger().debug(fullMessage);
if (isValid(fullMessage)) {
final String syslogMessage = getSyslogMessage(context, flowFile);
if (isValid(syslogMessage)) {
try {
// now that we validated, add a new line if doing TCP
if (protocol.equals(TCP_VALUE.getValue())) {
messageBuilder.append('\n');
}
sender.send(messageBuilder.toString(), charSet);
eventSender.sendEvent(syslogMessage);
timer.stop();
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri, duration, true);
getLogger().info("Transferring {} to success", new Object[]{flowFile});
getLogger().debug("Send Completed {}", flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {
getLogger().error("Transferring {} to failure", new Object[]{flowFile}, e);
} catch (final Exception e) {
getLogger().error("Send Failed {}", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
exceptionHolder.set(e);
}
} else {
getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
getLogger().debug("Syslog Message Invalid {}", flowFile);
session.transfer(flowFile, REL_INVALID);
}
}
} finally {
// if the connection is still open and no IO errors happened then try to return, if pool is full then close
if (sender.isConnected() && exceptionHolder.get() == null) {
boolean returned = senderPool.offer(sender);
if (!returned) {
sender.close();
}
} else {
// probably already closed here, but quietly close anyway to be safe
sender.close();
}
}
}
protected EventSender<String> getEventSender(final ProcessContext context) {
final TransportProtocol protocol = TransportProtocol.valueOf(context.getProperty(PROTOCOL).getValue());
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue());
final LineEnding lineEnding = TransportProtocol.TCP.equals(protocol) ? LineEnding.UNIX : LineEnding.NONE;
final StringNettyEventSenderFactory factory = new StringNettyEventSenderFactory(getLogger(), hostname, port, protocol, charset, lineEnding);
factory.setThreadNamePrefix(String.format("%s[%s]", PutSyslog.class.getSimpleName(), getIdentifier()));
factory.setWorkerThreads(context.getMaxConcurrentTasks());
factory.setMaxConnections(context.getMaxConcurrentTasks());
final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
factory.setTimeout(Duration.ofMillis(timeout));
final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
final SSLContextService sslContextService = sslContextServiceProperty.asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService.createContext();
factory.setSslContext(sslContext);
}
return factory.getEventSender();
}
private String getSyslogMessage(final ProcessContext context, final FlowFile flowFile) {
final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("<").append(priority).append(">");
if (version != null) {
messageBuilder.append(version).append(StringUtils.SPACE);
}
messageBuilder.append(timestamp).append(StringUtils.SPACE).append(hostname).append(StringUtils.SPACE).append(body);
return messageBuilder.toString();
}
private boolean isValid(final String message) {
for (Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
Matcher matcher = pattern.matcher(message);
for (final Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
final Matcher matcher = pattern.matcher(message);
if (matcher.matches()) {
return true;
}
}
return false;
}
private static class PruneResult {
private final int numClosed;
private final int numConsidered;
public PruneResult(final int numClosed, final int numConsidered) {
this.numClosed = numClosed;
this.numConsidered = numConsidered;
}
public int getNumClosed() {
return numClosed;
}
public int getNumConsidered() {
return numConsidered;
}
}
}

View File

@ -1,111 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSessionFactory
import org.apache.nifi.syslog.parsers.SyslogParser
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.util.encoders.Hex
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@RunWith(JUnit4.class)
class ITListenSyslogGroovy extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(ITListenSyslogGroovy.class)
static final String ZERO_LENGTH_MESSAGE = " \n"
@BeforeClass
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() throws Exception {
}
@After
void tearDown() throws Exception {
}
@Test
void testShouldHandleZeroLengthUDP() throws Exception {
// Arrange
final ListenSyslog proc = new ListenSyslog()
final TestRunner runner = TestRunners.newTestRunner(proc)
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue())
runner.setProperty(ListenSyslog.PORT, "0")
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory()
final ProcessContext context = runner.getProcessContext()
proc.onScheduled(context)
// Inject a SyslogParser which will always return null
def nullEventParser = [parseEvent: { byte[] bytes, String sender ->
logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null")
return null
}] as SyslogParser
proc.parser = nullEventParser
final int numMessages = 10
final int port = proc.getPort()
Assert.assertTrue(port > 0)
// write some TCP messages to the port in the background
final Thread sender = new Thread(new ITListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE))
sender.setDaemon(true)
sender.start()
// Act
// call onTrigger until we read all messages, or 30 seconds passed
try {
int numFailed = 0
long timeout = System.currentTimeMillis() + 30000
while (numFailed < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(50)
proc.onTrigger(context, processSessionFactory)
numFailed = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size()
}
int numSuccess = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size()
logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to INVALID")
// Assert
// all messages should be transferred to invalid
Assert.assertEquals("Did not process all the messages", numMessages, numFailed)
} finally {
// unschedule to close connections
proc.onUnscheduled()
}
}
}

View File

@ -1,184 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
/**
* Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
* to ListenSyslog, or PutSyslog sending to a syslog server.
*/
public class ITListenAndPutSyslog {
private static final String SSL_SERVICE_IDENTIFIER = SSLContextService.class.getName();
private static SSLContext keyStoreSslContext;
static final Logger LOGGER = LoggerFactory.getLogger(ITListenAndPutSyslog.class);
private ListenSyslog listenSyslog;
private TestRunner listenSyslogRunner;
private PutSyslog putSyslog;
private TestRunner putSyslogRunner;
@BeforeClass
public static void configureServices() throws TlsException {
keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
}
@Before
public void setup() {
this.listenSyslog = new ListenSyslog();
this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
this.putSyslog = new PutSyslog();
this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
}
@After
public void teardown() {
try {
putSyslog.onStopped();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
try {
listenSyslog.onUnscheduled();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
@Test
public void testUDP() throws IOException, InterruptedException {
run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
}
@Test
public void testTCP() throws IOException, InterruptedException {
run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
}
@Test
public void testTLS() throws InitializationException, IOException, InterruptedException {
configureSSLContextService(listenSyslogRunner);
listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, SSL_SERVICE_IDENTIFIER);
configureSSLContextService(putSyslogRunner);
putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, SSL_SERVICE_IDENTIFIER);
run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
}
@Test
public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
configureSSLContextService(listenSyslogRunner);
listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, SSL_SERVICE_IDENTIFIER);
// send 7 but expect 0 because sender didn't use TLS
run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
}
private void configureSSLContextService(TestRunner runner) throws InitializationException {
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_SERVICE_IDENTIFIER);
Mockito.when(sslContextService.createContext()).thenReturn(keyStoreSslContext);
runner.addControllerService(SSL_SERVICE_IDENTIFIER, sslContextService);
runner.enableControllerService(sslContextService);
}
/**
* Sends numMessages from PutSyslog to ListenSyslog.
*/
private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
// set the same protocol on both processors
putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
// set a listening port of 0 to get a random available port
listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
// call onScheduled to start ListenSyslog listening
final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
final ProcessContext context = listenSyslogRunner.getProcessContext();
listenSyslog.onScheduled(context);
// get the real port it is listening on and set that in PutSyslog
final int listeningPort = listenSyslog.getPort();
putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
// configure the message properties on PutSyslog
final String pri = "34";
final String version = "1";
final String stamp = "2016-02-05T22:14:15.003Z";
final String host = "localhost";
final String body = "some message";
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
// send the messages
for (int i=0; i < numMessages; i++) {
putSyslogRunner.enqueue("incoming data".getBytes(StandardCharsets.UTF_8));
}
putSyslogRunner.run(numMessages, false);
// trigger ListenSyslog until we've seen all the messages
int numTransfered = 0;
long timeout = System.currentTimeMillis() + 30000;
while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
listenSyslog.onTrigger(context, processSessionFactory);
numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
if (expectedMessages > 0) {
// check that one of flow files has the expected content
MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(expectedMessage);
}
}
}

View File

@ -1,402 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.nifi.processors.standard.TestListenSyslog.DatagramSender;
public class ITListenSyslog {
static final Logger LOGGER = LoggerFactory.getLogger(ITListenSyslog.class);
static final String PRI = "34";
static final String SEV = "2";
static final String FAC = "4";
static final String TIME = "Oct 13 15:43:23";
static final String HOST = "localhost.home";
static final String BODY = "some message";
static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY;
static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
static final String INVALID_MESSAGE = "this is not valid\n";
@Test
public void testUDP() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final int numMessages = 20;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// write some UDP messages to the port in the background
final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
sender.setDaemon(true);
sender.start();
// call onTrigger until we read all datagrams, or 30 seconds passed
try {
int numTransferred = 0;
long timeout = System.currentTimeMillis() + 30000;
while (numTransferred < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
@Test
public void testTCPSingleConnection() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
// Allow time for the processor to perform its scheduled start
Thread.sleep(500);
final int numMessages = 20;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// write some TCP messages to the port in the background
final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
sender.setDaemon(true);
sender.start();
// call onTrigger until we read all messages, or 30 seconds passed
try {
int nubTransferred = 0;
long timeout = System.currentTimeMillis() + 30000;
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
proc.onTrigger(context, processSessionFactory);
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
@Test
public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final int numMessages = 3;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// send 3 messages as 1
final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n";
final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
sender.setDaemon(true);
sender.start();
// call onTrigger until we read all messages, or 30 seconds passed
try {
int nubTransferred = 0;
long timeout = System.currentTimeMillis() + 30000;
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
proc.onTrigger(context, processSessionFactory);
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
@Test
public void testTCPMultipleConnection() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final int numMessages = 20;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// write some TCP messages to the port in the background
final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
sender.setDaemon(true);
sender.start();
// call onTrigger until we read all messages, or 30 seconds passed
try {
int nubTransferred = 0;
long timeout = System.currentTimeMillis() + 30000;
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
proc.onTrigger(context, processSessionFactory);
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
@Test
public void testInvalid() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final int numMessages = 10;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// write some TCP messages to the port in the background
final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE));
sender.setDaemon(true);
sender.start();
// call onTrigger until we read all messages, or 30 seconds passed
try {
int nubTransferred = 0;
long timeout = System.currentTimeMillis() + 30000;
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(50);
proc.onTrigger(context, processSessionFactory);
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
}
// all messages should be transferred to invalid
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", ""));
Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.SYSLOG_PRIORITY.key()));
Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SYSLOG_SEVERITY.key()));
Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.SYSLOG_FACILITY.key()));
Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.SYSLOG_HOSTNAME.key()));
Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.SYSLOG_BODY.key()));
Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.SYSLOG_VALID.key()));
Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key()));
Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key())));
}
/**
* Sends a given number of datagrams to the given port.
*/
public static final class SingleConnectionSocketSender implements Runnable {
final int port;
final int numMessages;
final long delay;
final String message;
public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) {
this.port = port;
this.numMessages = numMessages;
this.delay = delay;
this.message = message;
}
@Override
public void run() {
byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", port));
for (int i = 0; i < numMessages; i++) {
buffer.clear();
buffer.put(bytes);
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(delay);
}
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}
/**
* Sends a given number of datagrams to the given port.
*/
public static final class MultiConnectionSocketSender implements Runnable {
final int port;
final int numMessages;
final long delay;
final String message;
public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) {
this.port = port;
this.numMessages = numMessages;
this.delay = delay;
this.message = message;
}
@Override
public void run() {
byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
for (int i = 0; i < numMessages; i++) {
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", port));
buffer.clear();
buffer.put(bytes);
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(delay);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}
}
}

View File

@ -16,269 +16,191 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.StringNettyEventSenderFactory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestListenSyslog {
private static final String PRIORITY = "34";
private static final String TIMESTAMP = "Jan 31 23:59:59";
private static final String HOST = "localhost.localdomain";
private static final String BODY = String.class.getName();
private static final String VALID_MESSAGE = String.format("<%s>%s %s %s", PRIORITY, TIMESTAMP, HOST, BODY);
private static final String MIME_TYPE = "text/plain";
static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class);
private static final boolean STOP_ON_FINISH_DISABLED = false;
private static final boolean STOP_ON_FINISH_ENABLED = true;
private static final boolean INITIALIZE_DISABLED = false;
private static final String LOCALHOST_ADDRESS = "127.0.0.1";
private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(15);
private static final Charset CHARSET = StandardCharsets.US_ASCII;
static final String PRI = "34";
static final String SEV = "2";
static final String FAC = "4";
static final String TIME = "Oct 13 15:43:23";
static final String HOST = "localhost.home";
static final String BODY = "some message";
private TestRunner runner;
static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY;
static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
static final String INVALID_MESSAGE = "this is not valid\n";
private ListenSyslog processor;
@BeforeClass
public static void setupBeforeClass() throws Exception {
//These tests are unreliable on slow windows builds for some reason
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
@Before
public void setRunner() {
processor = new ListenSyslog();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(ListenSyslog.CHARSET, CHARSET.name());
}
@After
public void closeEventSender() {
processor.shutdownEventServer();
}
@Test
public void testBatching() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "25");
runner.setProperty(ListenSyslog.MESSAGE_DELIMITER, "|");
runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
public void testUdpSslContextServiceInvalid() throws InitializationException {
runner.setProperty(ListenSyslog.PROTOCOL, TransportProtocol.UDP.toString());
final int port = NetworkUtils.getAvailableUdpPort();
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final RestrictedSSLContextService sslContextService = mock(RestrictedSSLContextService.class);
final String identifier = RestrictedSSLContextService.class.getName();
when(sslContextService.getIdentifier()).thenReturn(identifier);
runner.addControllerService(identifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, identifier);
// the processor has internal blocking queue with capacity 10 so we have to send
// less than that since we are sending all messages before the processors ever runs
final int numMessages = 5;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// write some UDP messages to the port in the background
final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
sender.setDaemon(true);
sender.start();
sender.join();
try {
proc.onTrigger(context, processSessionFactory);
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
Assert.assertEquals("0", flowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key()));
Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key())));
final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
final String[] splits = content.split("\\|");
Assert.assertEquals(numMessages, splits.length);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
runner.assertNotValid();
}
@Test
public void testParsingError() throws IOException {
final FailParseProcessor proc = new FailParseProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
public void testRunTcp() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
final TransportProtocol protocol = TransportProtocol.TCP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
try {
final int port = proc.getPort();
final DatagramSender sender = new DatagramSender(port, 1, 1, INVALID_MESSAGE);
sender.run();
// should keep re-processing event1 from the error queue
proc.onTrigger(context, processSessionFactory);
runner.assertTransferCount(ListenSyslog.REL_INVALID, 1);
runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
} finally {
proc.onUnscheduled();
}
assertSendSuccess(protocol, port);
}
@Test
public void testErrorQueue() throws IOException {
final List<ListenSyslog.RawSyslogEvent> msgs = new ArrayList<>();
msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
public void testRunUdp() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final TransportProtocol protocol = TransportProtocol.UDP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
// Add message that will throw a FlowFileAccessException the first time that we attempt to read
// the contents but will succeed the second time.
final AtomicInteger getMessageAttempts = new AtomicInteger(0);
msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") {
@Override
public byte[] getData() {
final int attempts = getMessageAttempts.incrementAndGet();
if (attempts == 1) {
throw new FlowFileAccessException("Unit test failure");
} else {
return VALID_MESSAGE.getBytes();
}
}
});
final CannedMessageProcessor proc = new CannedMessageProcessor(msgs);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "5");
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
runner.run();
assertEquals(1, proc.getErrorQueueSize());
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE + "\n" + VALID_MESSAGE);
// running again should pull from the error queue
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE);
assertSendSuccess(protocol, port);
}
/**
* Sends a given number of datagrams to the given port.
*/
public static final class DatagramSender implements Runnable {
@Test
public void testRunUdpBatch() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final TransportProtocol protocol = TransportProtocol.UDP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
final int port;
final int numMessages;
final long delay;
final String message;
final String[] messages = new String[]{VALID_MESSAGE, VALID_MESSAGE};
public DatagramSender(int port, int numMessages, long delay, String message) {
this.port = port;
this.numMessages = numMessages;
this.delay = delay;
this.message = message;
}
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, Integer.toString(messages.length));
runner.setProperty(ListenSyslog.PARSE_MESSAGES, Boolean.FALSE.toString());
@Override
public void run() {
byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
runner.run(1, STOP_ON_FINISH_DISABLED);
sendMessages(protocol, port, LineEnding.NONE, messages);
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
try (DatagramChannel channel = DatagramChannel.open()) {
channel.connect(new InetSocketAddress("localhost", port));
for (int i = 0; i < numMessages; i++) {
buffer.clear();
buffer.put(bytes);
buffer.flip();
final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS);
assertEquals("Success FlowFiles not matched", 1, successFlowFiles.size());
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(delay);
}
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
final Long receivedCounter = runner.getCounterValue(ListenSyslog.RECEIVED_COUNTER);
assertEquals("Received Counter not matched", Long.valueOf(messages.length), receivedCounter);
final Long successCounter = runner.getCounterValue(ListenSyslog.SUCCESS_COUNTER);
assertEquals("Success Counter not matched", Long.valueOf(1), successCounter);
}
// A mock version of ListenSyslog that will queue the provided events
private static class FailParseProcessor extends ListenSyslog {
@Test
public void testRunUdpInvalid() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final TransportProtocol protocol = TransportProtocol.UDP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
@Override
protected SyslogParser getParser() {
return new SyslogParser(StandardCharsets.UTF_8) {
@Override
public SyslogEvent parseEvent(byte[] bytes, String sender) {
throw new ProcessException("Unit test intentionally failing");
}
};
}
runner.run(1, STOP_ON_FINISH_DISABLED);
sendMessages(protocol, port, LineEnding.NONE, TIMESTAMP);
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
final List<MockFlowFile> invalidFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
assertEquals("Invalid FlowFiles not matched", 1, invalidFlowFiles.size());
final MockFlowFile flowFile = invalidFlowFiles.iterator().next();
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_SENDER.key(), LOCALHOST_ADDRESS);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol.toString());
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PORT.key(), Integer.toString(port));
final String content = flowFile.getContent();
assertEquals("FlowFile content not matched", TIMESTAMP, content);
}
private static class CannedMessageProcessor extends ListenSyslog {
private void assertSendSuccess(final TransportProtocol protocol, final int port) throws Exception {
runner.run(1, STOP_ON_FINISH_DISABLED);
private final Iterator<RawSyslogEvent> eventItr;
sendMessages(protocol, port, LineEnding.UNIX, VALID_MESSAGE);
public CannedMessageProcessor(final List<RawSyslogEvent> events) {
this.eventItr = events.iterator();
}
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS);
assertEquals("Success FlowFiles not matched", 1, successFlowFiles.size());
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.remove(PORT);
properties.add(
new PropertyDescriptor.Builder()
.name(PORT.getName())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(Validator.VALID)
.build()
);
return properties;
}
final MockFlowFile flowFile = successFlowFiles.iterator().next();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_SENDER.key(), LOCALHOST_ADDRESS);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol.toString());
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PORT.key(), Integer.toString(port));
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_HOSTNAME.key(), HOST);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_BODY.key(), BODY);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PRIORITY.key(), PRIORITY);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_TIMESTAMP.key(), TIMESTAMP);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_VALID.key(), Boolean.TRUE.toString());
flowFile.assertAttributeExists(SyslogAttributes.SYSLOG_FACILITY.key());
flowFile.assertAttributeExists(SyslogAttributes.SYSLOG_SEVERITY.key());
@Override
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
if (eventItr.hasNext()) {
return eventItr.next();
final Long receivedCounter = runner.getCounterValue(ListenSyslog.RECEIVED_COUNTER);
assertEquals("Received Counter not matched", Long.valueOf(1), receivedCounter);
final Long successCounter = runner.getCounterValue(ListenSyslog.SUCCESS_COUNTER);
assertEquals("Success Counter not matched", Long.valueOf(1), successCounter);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertFalse("Provenance Events not found", events.isEmpty());
final ProvenanceEventRecord eventRecord = events.iterator().next();
assertEquals(ProvenanceEventType.RECEIVE, eventRecord.getEventType());
final String transitUri = String.format("%s://%s:%d", protocol.toString().toLowerCase(), LOCALHOST_ADDRESS, port);
assertEquals("Provenance Transit URI not matched", transitUri, eventRecord.getTransitUri());
}
private void sendMessages(final TransportProtocol protocol, final int port, final LineEnding lineEnding, final String... messages) throws Exception {
final StringNettyEventSenderFactory eventSenderFactory = new StringNettyEventSenderFactory(runner.getLogger(), LOCALHOST_ADDRESS, port, protocol, CHARSET, lineEnding);
eventSenderFactory.setTimeout(SENDER_TIMEOUT);
try (final EventSender<String> eventSender = eventSenderFactory.getEventSender()) {
for (final String message : messages) {
eventSender.sendEvent(message);
}
return super.getMessage(longPoll, pollErrorQueue, session);
}
}
}

View File

@ -16,460 +16,148 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestPutSyslog {
private static final String ADDRESS = "127.0.0.1";
private static final String LOCALHOST = "localhost";
private static final String MESSAGE_BODY = String.class.getName();
private static final String MESSAGE_PRIORITY = "1";
private static final String DEFAULT_PROTOCOL = "UDP";
private static final String TIMESTAMP = "Jan 1 00:00:00";
private static final String VERSION = "2";
private static final String SYSLOG_MESSAGE = String.format("<%s>%s %s %s", MESSAGE_PRIORITY, TIMESTAMP, LOCALHOST, MESSAGE_BODY);
private static final String VERSION_SYSLOG_MESSAGE = String.format("<%s>%s %s %s %s", MESSAGE_PRIORITY, VERSION, TIMESTAMP, LOCALHOST, MESSAGE_BODY);
private static final int MAX_FRAME_LENGTH = 1024;
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final String DELIMITER = "\n";
private static final int POLL_TIMEOUT_SECONDS = 5;
private MockCollectingSender sender;
private MockPutSyslog proc;
private TestRunner runner;
private TransportProtocol protocol = TransportProtocol.UDP;
private int port;
@Before
public void setup() throws IOException {
sender = new MockCollectingSender();
proc = new MockPutSyslog(sender);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSyslog.HOSTNAME, "localhost");
runner.setProperty(PutSyslog.PORT, "12345");
}
@Test
public void testValidMessageStaticPropertiesUdp() {
final String pri = "34";
final String version = "1";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
runner.setProperty(PutSyslog.MSG_VERSION, version);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
runner.setProperty(PutSyslog.MSG_BODY, body);
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0));
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
Assert.assertEquals("UDP://localhost:12345", event.getTransitUri());
}
@Test
public void testValidMessageStaticPropertiesTcp() {
final String pri = "34";
final String version = "1";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
runner.setProperty(PutSyslog.MSG_VERSION, version);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
runner.setProperty(PutSyslog.MSG_BODY, body);
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
Assert.assertEquals("TCP://localhost:12345", event.getTransitUri());
}
@Test
public void testValidELPropertiesTcp() {
final String pri = "34";
final String version = "1";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
runner.setProperty(PutSyslog.HOSTNAME, "${'hostname'}");
runner.setProperty(PutSyslog.PORT, "${port}");
runner.setProperty(PutSyslog.CHARSET, "${charset}");
runner.setProperty(PutSyslog.TIMEOUT, "${timeout}");
runner.setProperty(PutSyslog.MAX_SOCKET_SEND_BUFFER_SIZE, "${maxSocketSenderBufferSize}");
runner.setProperty(PutSyslog.IDLE_EXPIRATION, "${idleExpiration}");
runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
runner.setProperty(PutSyslog.MSG_VERSION, version);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
runner.setProperty(PutSyslog.MSG_BODY, body);
public void setRunner() {
port = NetworkUtils.getAvailableUdpPort();
runner = TestRunners.newTestRunner(PutSyslog.class);
runner.setProperty(PutSyslog.HOSTNAME, ADDRESS);
runner.setProperty(PutSyslog.PROTOCOL, protocol.toString());
runner.setProperty(PutSyslog.PORT, Integer.toString(port));
runner.setProperty(PutSyslog.MSG_BODY, MESSAGE_BODY);
runner.setProperty(PutSyslog.MSG_PRIORITY, MESSAGE_PRIORITY);
runner.setProperty(PutSyslog.MSG_HOSTNAME, LOCALHOST);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, TIMESTAMP);
runner.assertValid();
runner.setVariable("hostname", "hostname");
runner.setVariable("port", "10443");
runner.setVariable("charset", "UTF-8");
runner.setVariable("timeout", "10 secs");
runner.setVariable("maxSocketSenderBufferSize", "10 mb");
runner.setVariable("idleExpiration", "10 secs");
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
Assert.assertEquals("TCP://hostname:10443", event.getTransitUri());
}
@Test
public void testValidMessageStaticPropertiesNoVersion() {
final String pri = "34";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
runner.setProperty(PutSyslog.MSG_BODY, body);
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
public void testRunNoFlowFiles() {
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0));
runner.assertQueueEmpty();
}
@Test
public void testValidMessageELProperties() {
final String pri = "34";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("syslog.priority", pri);
attributes.put("syslog.timestamp", stamp);
attributes.put("syslog.hostname", host);
attributes.put("syslog.body", body);
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0));
public void testRunSuccess() throws InterruptedException {
assertSyslogMessageSuccess(SYSLOG_MESSAGE, Collections.emptyMap());
}
@Test
public void testInvalidMessageELProperties() {
final String pri = "34";
final String stamp = "not-a-timestamp";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
public void testRunSuccessSyslogVersion() throws InterruptedException {
final String versionAttributeKey = "version";
runner.setProperty(PutSyslog.MSG_VERSION, String.format("${%s}", versionAttributeKey));
final Map<String, String> attributes = Collections.singletonMap(versionAttributeKey, VERSION);
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("syslog.priority", pri);
attributes.put("syslog.timestamp", stamp);
attributes.put("syslog.hostname", host);
attributes.put("syslog.body", body);
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1);
Assert.assertEquals(0, sender.messages.size());
assertSyslogMessageSuccess(VERSION_SYSLOG_MESSAGE, attributes);
}
@Test
public void testIOExceptionOnSend() throws IOException {
final String pri = "34";
final String version = "1";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
proc = new MockPutSyslog(new MockErrorSender());
runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSyslog.HOSTNAME, "localhost");
runner.setProperty(PutSyslog.PORT, "12345");
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
runner.setProperty(PutSyslog.MSG_VERSION, version);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
runner.setProperty(PutSyslog.MSG_BODY, body);
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
public void testRunInvalid() {
runner.setProperty(PutSyslog.MSG_PRIORITY, Integer.toString(Integer.MAX_VALUE));
runner.enqueue(new byte[]{});
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1);
Assert.assertEquals(0, sender.messages.size());
runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID);
}
@Test
public void testIOExceptionCreatingConnection() throws IOException {
final String pri = "34";
final String version = "1";
final String stamp = "2003-10-11T22:14:15.003Z";
final String host = "mymachine.example.com";
final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSyslog.HOSTNAME, "localhost");
runner.setProperty(PutSyslog.PORT, "12345");
runner.setProperty(PutSyslog.BATCH_SIZE, "1");
runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
runner.setProperty(PutSyslog.MSG_VERSION, version);
runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
runner.setProperty(PutSyslog.MSG_BODY, body);
// the first run will throw IOException when calling send so the connection won't be re-qeued
// the second run will try to create a new connection but throw an exception which should be caught and route files to failure
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
runner.run(2);
runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2);
Assert.assertEquals(0, sender.messages.size());
}
@Test
public void testLargeMessageFailure() {
final String pri = "34";
final String stamp = "2015-10-15T22:14:15.003Z";
final String host = "mymachine.example.com";
final StringBuilder bodyBuilder = new StringBuilder(4096);
for (int i=0; i < 4096; i++) {
bodyBuilder.append("a");
}
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("syslog.priority", pri);
attributes.put("syslog.timestamp", stamp);
attributes.put("syslog.hostname", host);
attributes.put("syslog.body", bodyBuilder.toString());
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
public void testRunFailure() {
runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
runner.setProperty(PutSyslog.PORT, Integer.toString(NetworkUtils.getAvailableTcpPort()));
runner.enqueue(new byte[]{});
runner.run();
// should have dynamically created a larger buffer
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE);
}
@Test
public void testNoIncomingData() {
runner.setProperty(PutSyslog.MSG_PRIORITY, "10");
runner.setProperty(PutSyslog.MSG_VERSION, "1");
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z");
runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost");
runner.setProperty(PutSyslog.MSG_BODY, "test");
private void assertSyslogMessageSuccess(final String expectedSyslogMessage, final Map<String, String> attributes) throws InterruptedException {
final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>();
final byte[] delimiter = DELIMITER.getBytes(CHARSET);
final EventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), ADDRESS, port, protocol, delimiter, MAX_FRAME_LENGTH, messages);
final EventServer eventServer = serverFactory.getEventServer();
// queue one file but run several times to test no incoming data
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
runner.run(5);
try {
runner.enqueue(expectedSyslogMessage, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
}
final ByteArrayMessage message = messages.poll(POLL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
final String syslogMessage = new String(message.getMessage(), CHARSET);
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS);
@Test
public void testBatchingFlowFiles() {
runner.setProperty(PutSyslog.BATCH_SIZE, "10");
runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("syslog.priority", "10");
attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z");
attributes.put("syslog.hostname", "my.host.name");
attributes.put("syslog.body", "blah blah blah");
for (int i=0; i < 15; i++) {
runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
}
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10);
Assert.assertEquals(10, sender.messages.size());
runner.run();
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15);
Assert.assertEquals(15, sender.messages.size());
}
// Mock processor to return a MockCollectingSender
static class MockPutSyslog extends PutSyslog {
ChannelSender mockSender;
public MockPutSyslog(ChannelSender sender) {
this.mockSender = sender;
}
@Override
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host,
int port, int maxSendBuffer, int timeout)
throws IOException {
return mockSender;
assertEquals(expectedSyslogMessage, syslogMessage);
assertProvenanceRecordTransitUriFound();
} finally {
eventServer.shutdown();
}
}
// Mock processor to test exception when creating new senders
static class MockCreationErrorPutSyslog extends PutSyslog {
private void assertProvenanceRecordTransitUriFound() {
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertFalse("Provenance Events not found", provenanceEvents.isEmpty());
final ProvenanceEventRecord provenanceEventRecord = provenanceEvents.iterator().next();
assertEquals(ProvenanceEventType.SEND, provenanceEventRecord.getEventType());
int numSendersCreated;
int numSendersAllowed;
ChannelSender mockSender;
public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) {
this.mockSender = sender;
this.numSendersAllowed = numSendersAllowed;
}
@Override
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host,
int port, int maxSendBuffer, int timeout)
throws IOException {
if (numSendersCreated >= numSendersAllowed) {
throw new IOException("too many senders");
}
numSendersCreated++;
return mockSender;
}
final String transitUri = provenanceEventRecord.getTransitUri();
assertNotNull("Transit URI not found", transitUri);
assertTrue("Transit URI Protocol not found", transitUri.contains(DEFAULT_PROTOCOL));
assertTrue("Transit URI Hostname not found", transitUri.contains(ADDRESS));
assertTrue("Transit URI Port not found", transitUri.contains(Integer.toString(port)));
}
// Mock sender that saves any messages passed to send()
static class MockCollectingSender extends ChannelSender {
List<String> messages = new ArrayList<>();
public MockCollectingSender() throws IOException {
super("myhost", 0, 0, null);
}
@Override
public void open() throws IOException {
}
@Override
public void send(String message, Charset charset) throws IOException {
messages.add(message);
super.send(message, charset);
}
@Override
protected void write(byte[] buffer) throws IOException {
}
@Override
public boolean isConnected() {
return true;
}
@Override
public void close() {
}
}
// Mock sender that throws IOException on calls to write() or send()
static class MockErrorSender extends ChannelSender {
public MockErrorSender() throws IOException {
super(null, 0, 0, null);
}
@Override
public void open() throws IOException {
}
@Override
public void send(String message, Charset charset) throws IOException {
throw new IOException("error");
}
@Override
protected void write(byte[] data) throws IOException {
throw new IOException("error");
}
@Override
public boolean isConnected() {
return false;
}
@Override
public void close() {
}
}
}