NIFI-9453 Refactored ListenBeats using Netty

This closes #5669

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-01-10 23:29:25 -05:00 committed by exceptionfactory
parent 278967829a
commit 08153b8260
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
18 changed files with 399 additions and 900 deletions

View File

@ -24,44 +24,47 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processors.beats.netty.BeatsMessage;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.event.BeatsEvent;
import org.apache.nifi.processors.beats.event.BeatsEventFactory;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.handler.BeatsSocketChannelHandlerFactory;
import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
import org.apache.nifi.processors.beats.response.BeatsResponse;
import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"}) @Tags({"listen", "beats", "tcp", "logs"})
@ -75,7 +78,7 @@ import java.util.concurrent.BlockingQueue;
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json") @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")
}) })
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"}) @SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent> { public class ListenBeats extends AbstractProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE") .name("SSL_CONTEXT_SERVICE")
@ -96,13 +99,40 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent
.defaultValue(ClientAuth.REQUIRED.name()) .defaultValue(ClientAuth.REQUIRED.name())
.build(); .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Messages received successfully will be sent out this relationship.")
.build();
protected List<PropertyDescriptor> descriptors;
protected Set<Relationship> relationships;
protected volatile int port;
protected volatile BlockingQueue<BeatsMessage> events;
protected volatile BlockingQueue<BeatsMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
protected volatile EventBatcher<BeatsMessage> eventBatcher;
@Override @Override
protected List<PropertyDescriptor> getAdditionalProperties() { protected void init(final ProcessorInitializationContext context) {
return Arrays.asList( final List<PropertyDescriptor> descriptors = new ArrayList<>();
MAX_CONNECTIONS, descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
SSL_CONTEXT_SERVICE, descriptors.add(ListenerProperties.PORT);
CLIENT_AUTH descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
); descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
// Deprecated
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
} }
@Override @Override
@ -111,13 +141,12 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isTrustStoreConfigured() == false) { if (sslContextService != null && !sslContextService.isTrustStoreConfigured()) {
results.add(new ValidationResult.Builder() results.add(new ValidationResult.Builder()
.explanation("The context service must have a truststore configured for the beats forwarder client to work correctly") .explanation("SSL Context Service requires a truststore for the Beats forwarder client to work correctly")
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build()); .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
} }
// Validate CLIENT_AUTH
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
if (sslContextService != null && StringUtils.isBlank(clientAuth)) { if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder() results.add(new ValidationResult.Builder()
@ -128,87 +157,103 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent
return results; return results;
} }
private volatile BeatsEncoder beatsEncoder;
@Override @Override
@OnScheduled public final Set<Relationship> getRelationships() {
public void onScheduled(ProcessContext context) throws IOException { return this.relationships;
super.onScheduled(context);
// wanted to ensure charset was already populated here
beatsEncoder = new BeatsEncoder();
} }
@Override @Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<BeatsEvent> events) throws IOException { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final EventFactory<BeatsEvent> eventFactory = new BeatsEventFactory(); return descriptors;
final ChannelHandlerFactory<BeatsEvent, AsyncChannelDispatcher> handlerFactory = new BeatsSocketChannelHandlerFactory<>(); }
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); @OnScheduled
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); public void onScheduled(final ProcessContext context) throws IOException {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); final int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
final int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
errorEvents = new LinkedBlockingQueue<>();
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
// initialize the buffer pool based on max number of connections and the buffer size final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, charset, events);
final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
ClientAuth clientAuth = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) { if (sslContextService != null) {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createContext(); ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
clientAuth = ClientAuth.valueOf(clientAuthValue); SSLContext sslContext = sslContextService.createContext();
eventFactory.setSslContext(sslContext);
eventFactory.setClientAuth(clientAuth);
} }
// if we decide to support SSL then get the context and pass it in here eventFactory.setSocketReceiveBuffer(bufferSize);
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events, eventFactory.setWorkerThreads(workerThreads);
getLogger(), maxConnections, sslContext, clientAuth, charSet); eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
}
@Override
protected String getBatchKey(BeatsEvent event) {
return event.getSender();
}
protected void respond(final BeatsEvent event, final BeatsResponse beatsResponse) {
final ChannelResponse response = new BeatsChannelResponse(beatsEncoder, beatsResponse);
final ChannelResponder responder = event.getResponder();
responder.addResponse(response);
try { try {
responder.respond(); eventServer = eventFactory.getEventServer();
} catch (IOException e) { } catch (EventException e) {
getLogger().error("Error sending response for transaction {} due to {}", getLogger().error("Failed to bind to [{}:{}]", address, port, e);
new Object[]{event.getSeqNumber(), e.getMessage()}, e);
} }
} }
protected void postProcess(final ProcessContext context, final ProcessSession session, final List<BeatsEvent> events) { @Override
// first commit the session so we guarantee we have all the events successfully public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// written to FlowFiles and transferred to the success relationship EventBatcher<BeatsMessage> eventBatcher = getEventBatcher();
session.commitAsync(() -> {
// respond to each event to acknowledge successful receipt final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
for (final BeatsEvent event : events) { Map<String, FlowFileEventBatch<BeatsMessage>> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
respond(event, BeatsResponse.ok(event.getSeqNumber())); processEvents(session, batches);
}
@OnStopped
public void stopped() {
if (eventServer != null) {
eventServer.shutdown();
}
eventBatcher = null;
}
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<BeatsMessage>> batches) {
for (Map.Entry<String, FlowFileEventBatch<BeatsMessage>> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<BeatsMessage> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
continue;
} }
});
final Map<String,String> attributes = getAttributes(entry.getValue());
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// the sender and command will be the same for all events based on the batch key
final String transitUri = getTransitUri(entry.getValue());
session.getProvenanceReporter().receive(flowFile, transitUri);
}
session.commitAsync();
} }
@Override protected String getTransitUri(FlowFileEventBatch<BeatsMessage> batch) {
protected String getTransitUri(FlowFileEventBatch batch) { final List<BeatsMessage> events = batch.getEvents();
final String sender = batch.getEvents().get(0).getSender(); final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("beats").append("://").append(senderHost).append(":") return String.format("beats://%s:%d", senderHost, port);
.append(port).toString();
return transitUri;
} }
@Override protected Map<String, String> getAttributes(FlowFileEventBatch<BeatsMessage> batch) {
protected Map<String, String> getAttributes(FlowFileEventBatch batch) { final List<BeatsMessage> events = batch.getEvents();
final List<BeatsEvent> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key // the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender(); final String sender = events.get(0).getSender();
final int numAttributes = events.size() == 1 ? 5 : 4; final int numAttributes = events.size() == 1 ? 5 : 4;
@ -224,6 +269,24 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent
return attributes; return attributes;
} }
private String getMessageDemarcator(final ProcessContext context) {
return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
.getValue()
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
private EventBatcher<BeatsMessage> getEventBatcher() {
if (eventBatcher == null) {
eventBatcher = new EventBatcher<BeatsMessage>(getLogger(), events, errorEvents) {
@Override
protected String getBatchKey(BeatsMessage event) {
return event.getSender();
}
};
}
return eventBatcher;
}
public enum beatsAttributes implements FlowFileAttributeKey { public enum beatsAttributes implements FlowFileAttributeKey {
SENDER("beats.sender"), SENDER("beats.sender"),
PORT("beats.port"), PORT("beats.port"),

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.processors.beats.frame; package org.apache.nifi.processors.beats.frame;
import org.apache.nifi.logging.ComponentLog;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -25,7 +27,6 @@ import java.nio.charset.Charset;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import org.apache.nifi.logging.ComponentLog;
/** /**
* Decodes a Beats frame by maintaining a state based on each byte that has been processed. This class * Decodes a Beats frame by maintaining a state based on each byte that has been processed. This class
@ -52,8 +53,6 @@ public class BeatsDecoder {
static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32 bit unsigned + payload static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32 bit unsigned + payload
static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit unsigned sequence number + 32 bit unsigned payload length static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit unsigned sequence number + 32 bit unsigned payload length
public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
/** /**
* @param charset the charset to decode bytes from the frame * @param charset the charset to decode bytes from the frame
*/ */
@ -103,11 +102,11 @@ public class BeatsDecoder {
// At one stage, the data sent to processPAYLOAD will be represente a complete frame, so we check before returning true // At one stage, the data sent to processPAYLOAD will be represente a complete frame, so we check before returning true
if (frameBuilder.frameType == FRAME_WINDOWSIZE && currState == BeatsState.COMPLETE) { if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE && currState == BeatsState.COMPLETE) {
return true; return true;
} else if (frameBuilder.frameType == FRAME_COMPRESSED && currState == BeatsState.COMPLETE) { } else if (frameBuilder.frameType == BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) {
return true; return true;
} else if (frameBuilder.frameType == FRAME_JSON && currState == BeatsState.COMPLETE) { } else if (frameBuilder.frameType == BeatsFrameType.JSON && currState == BeatsState.COMPLETE) {
return true; return true;
} else { } else {
break; break;
@ -138,7 +137,7 @@ public class BeatsDecoder {
} }
try { try {
// Once compressed frames are expanded, they must be devided into individual frames // Once compressed frames are expanded, they must be devided into individual frames
if (currState == BeatsState.COMPLETE && frameBuilder.frameType == FRAME_COMPRESSED) { if (currState == BeatsState.COMPLETE && frameBuilder.frameType == BeatsFrameType.COMPRESSED) {
logger.debug("Frame is compressed, will iterate to decode", new Object[]{}); logger.debug("Frame is compressed, will iterate to decode", new Object[]{});
// Zero currBytes, currState and frameBuilder prior to iteration over // Zero currBytes, currState and frameBuilder prior to iteration over
@ -178,7 +177,7 @@ public class BeatsDecoder {
// 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}" // 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
// //
// Therefore, instead of calling process method again, just iterate over each of // Therefore, instead of calling process method again, just iterate over each of
// the frames and split them so they can be processed by BeatsFrameHandler // the frames and split them so they can be processed
while (currentData.hasRemaining()) { while (currentData.hasRemaining()) {
@ -187,7 +186,7 @@ public class BeatsDecoder {
internalFrameBuilder.version = currentData.get(); internalFrameBuilder.version = currentData.get();
internalFrameBuilder.frameType = currentData.get(); internalFrameBuilder.frameType = currentData.get();
switch (internalFrameBuilder.frameType) { switch (internalFrameBuilder.frameType) {
case FRAME_JSON: case BeatsFrameType.JSON:
internalFrameBuilder.seqNumber = (int) (currentData.getInt() & 0x00000000ffffffffL); internalFrameBuilder.seqNumber = (int) (currentData.getInt() & 0x00000000ffffffffL);
currentData.mark(); currentData.mark();
@ -240,7 +239,7 @@ public class BeatsDecoder {
private void processPAYLOAD(final byte b) { private void processPAYLOAD(final byte b) {
currBytes.write(b); currBytes.write(b);
switch (decodedFrameType) { switch (decodedFrameType) {
case FRAME_WINDOWSIZE: //'W' case BeatsFrameType.WINDOWSIZE: //'W'
if (currBytes.size() < WINDOWSIZE_LENGTH ) { if (currBytes.size() < WINDOWSIZE_LENGTH ) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()}); logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
break; break;
@ -257,7 +256,7 @@ public class BeatsDecoder {
logger.debug("Saw a packet I should not have seen. Packet contents were {}", new Object[] {currBytes.toString()}); logger.debug("Saw a packet I should not have seen. Packet contents were {}", new Object[] {currBytes.toString()});
break; break;
} }
case FRAME_COMPRESSED: //'C' case BeatsFrameType.COMPRESSED: //'C'
if (currBytes.size() < COMPRESSED_MIN_LENGTH) { if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()}); logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
@ -300,7 +299,7 @@ public class BeatsDecoder {
} }
break; break;
} }
case FRAME_JSON: // '' case BeatsFrameType.JSON: // ''
// Because Beats can disable compression, sometimes, JSON data will be received outside a compressed // Because Beats can disable compression, sometimes, JSON data will be received outside a compressed
// stream (i.e. 0x43). Instead of processing it here, we defer its processing to went getFrames is // stream (i.e. 0x43). Instead of processing it here, we defer its processing to went getFrames is
// called // called

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
public final class BeatsFrameType {
public static final byte WINDOWSIZE = 0x57;
public static final byte DATA = 0x44;
public static final byte COMPRESSED = 0x43;
public static final byte ACK = 0x41;
public static final byte JSON = 0x4a;
}

View File

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.beats.event; package org.apache.nifi.processors.beats.frame;
/** /**
* Metadata keys for event. * Metadata keys for Beats message.
*/ */
public interface BeatsMetadata { public interface BeatsMetadata {
String SEQNUMBER_KEY = "beats.sequencenumber"; String SEQNUMBER_KEY = "beats.sequencenumber";
String SENDER_KEY = "sender";
} }

View File

@ -1,87 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.beats.event.BeatsMetadata;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
/**
* Encapsulates the logic to handle a BeatsFrame once it has been read from the channel.
*/
public class BeatsFrameHandler<E extends Event<SocketChannel>> {
private final Charset charset;
private final EventFactory<E> eventFactory;
private final EventQueue<E> events;
private final SelectionKey key;
private final AsyncChannelDispatcher dispatcher;
private final BeatsEncoder encoder;
private final ComponentLog logger;
public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
public BeatsFrameHandler(final SelectionKey selectionKey,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final AsyncChannelDispatcher dispatcher,
final ComponentLog logger) {
this.key = selectionKey;
this.charset = charset;
this.eventFactory = eventFactory;
this.dispatcher = dispatcher;
this.logger = logger;
this.events = new EventQueue<>(events, logger);
this.encoder = new BeatsEncoder();
}
public void handle(final BeatsFrame frame, final ChannelResponder<SocketChannel> responder, final String sender)
throws IOException, InterruptedException {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
String line = "";
/* If frameType is a JSON , parse the frame payload into a JsonElement so that all JSON elements but "message"
are inserted into the event metadata.
As per above, the "message" element gets added into the body of the event
*/
if (frame.getFrameType() == FRAME_JSON ) {
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(frame.getPayload(), metadata, responder);
events.offer(event);
}
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
import org.apache.nifi.processors.beats.frame.BeatsDecoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsFrameException;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
/**
* A Beats compatible implementation of SSLSocketChannelHandler.
*/
public class BeatsSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
private BeatsDecoder decoder;
private BeatsFrameHandler<E> frameHandler;
public BeatsSSLSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new BeatsDecoder(charset, logger);
this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the packet command
for (int i = 0; i < bytesRead; i++) {
byte currByte = buffer[i];
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<BeatsFrame> frames = decoder.getFrames();
// A list of events has been generated
for (BeatsFrame frame : frames) {
logger.debug("Received Beats frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOWS type frames as they contain no payload.
if (frame.getFrameType() != 0x57 ) {
final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
}
}
logger.debug("Done processing buffer");
} catch (final BeatsFrameException rfe) {
logger.error("Error reading Beats frames due to {}", new Object[] {rfe.getMessage()} , rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
}

View File

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.apache.nifi.processors.beats.frame.BeatsDecoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsFrameException;
/**
* Extends the StandardSocketChannelHandler to decode bytes into Beats frames.
*/
public class BeatsSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
private BeatsDecoder decoder;
private BeatsFrameHandler<E> frameHandler;
public BeatsSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new BeatsDecoder(charset, logger);
this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer)
throws InterruptedException, IOException {
// get total bytes in buffer
final int total = socketBuffer.remaining();
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the packet command
for (int i = 0; i < total; i++) {
byte currByte = socketBuffer.get();
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<BeatsFrame> frames = decoder.getFrames();
for (BeatsFrame frame : frames) {
logger.debug("Received Beats frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOW SIZE type frames as they contain no payload.
if (frame.getFrameType() != 0x57) {
final SocketChannelResponder responder = new SocketChannelResponder(socketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
socketBuffer.mark();
}
}
logger.debug("Done processing buffer");
} catch (final BeatsFrameException rfe) {
logger.error("Error reading Beats frames due to {}", new Object[] {rfe.getMessage()}, rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
// not used for anything in Beats since the decoder encapsulates the delimiter
@Override
public byte getDelimiter() {
return BeatsFrame.DELIMITER;
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
/**
* Default factory for creating Beats socket channel handlers.
*/
public class BeatsSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new BeatsSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new BeatsSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processors.beats.frame.BeatsDecoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsFrameType;
import org.apache.nifi.processors.beats.frame.BeatsMetadata;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
/**
* Decode a Beats message's bytes into a BeatsMessage object
*/
public class BeatsFrameDecoder extends ByteToMessageDecoder {
private final Charset charset;
private final ComponentLog logger;
private final BeatsMessageFactory messageFactory;
public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
this.charset = charset;
this.logger = logger;
this.messageFactory = new BeatsMessageFactory();
}
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
final int total = in.readableBytes();
final String senderSocket = ctx.channel().remoteAddress().toString();
final BeatsDecoder decoder = new BeatsDecoder(charset, logger);
for (int i = 0; i < total; i++) {
byte currByte = in.readByte();
// decode the bytes and once we find the end of a frame, handle the frame
if (decoder.process(currByte)) {
final List<BeatsFrame> frames = decoder.getFrames();
for (BeatsFrame frame : frames) {
logger.debug("Received Beats Frame Sender [{}] Transaction [{}] Frame Type [{}]",
senderSocket, frame.getSeqNumber(), frame.getFrameType());
// Ignore the WINDOW SIZE type frames as they contain no payload.
if (frame.getFrameType() != BeatsFrameType.WINDOWSIZE) {
handle(frame, senderSocket, out);
}
}
}
}
}
private void handle(final BeatsFrame frame, final String sender, final List<Object> out) {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
if (frame.getFrameType() == BeatsFrameType.JSON) {
final BeatsMessage event = messageFactory.create(frame.getPayload(), metadata);
out.add(event);
}
}
}

View File

@ -14,27 +14,23 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.beats.event; package org.apache.nifi.processors.beats.netty;
import org.apache.nifi.processor.util.listen.event.StandardEvent; import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.nio.channels.SocketChannel;
/** /**
* A Beat event which adds the transaction number to the StandardEvent. * A Beats message which adds a sequence number to the ByteArrayMessage.
*/ */
public class BeatsEvent extends StandardEvent<SocketChannel> { public class BeatsMessage extends ByteArrayMessage {
private final int seqNumber; private final int seqNumber;
public BeatsEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final int seqNumber) { public BeatsMessage(final String sender, final byte[] data, final int seqNumber) {
super(sender, data, responder); super(data, sender);
this.seqNumber = seqNumber; this.seqNumber = seqNumber;
} }
public int getSeqNumber() { public int getSeqNumber() {
return seqNumber; return seqNumber;
} }
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
import org.apache.nifi.processors.beats.response.BeatsResponse;
import java.util.concurrent.BlockingQueue;
/**
* Decode data received into a BeatsMessage
*/
@ChannelHandler.Sharable
public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> {
private final ComponentLog componentLog;
private final BlockingQueue<BeatsMessage> events;
private final BeatsEncoder encoder;
public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events, ComponentLog componentLog) {
this.events = events;
this.componentLog = componentLog;
this.encoder = new BeatsEncoder();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
componentLog.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
if (events.offer(msg)) {
componentLog.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
BeatsChannelResponse successResponse = new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber()));
ctx.writeAndFlush(Unpooled.wrappedBuffer(successResponse.toByteArray()));
} else {
componentLog.warn("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
}
}
}

View File

@ -14,24 +14,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.beats.event; package org.apache.nifi.processors.beats.netty;
import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processors.beats.frame.BeatsMetadata;
import java.util.Map; import java.util.Map;
/** /**
* An EventFactory implementation to create BeatEvents. * An EventFactory implementation to create BeatsMessages.
*/ */
public class BeatsEventFactory implements EventFactory<BeatsEvent> { public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
@Override @Override
public BeatsEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { public BeatsMessage create(final byte[] data, final Map<String, String> metadata) {
final String sender = metadata.get(EventFactory.SENDER_KEY); final int sequenceNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
final int seqNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY)); final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
return new BeatsMessage(sender, data, sequenceNumber);
return new BeatsEvent(sender, data, responder, seqNumber);
} }
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.logging.ComponentLog;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
/**
* Netty Event Server Factory implementation for RELP Messages
*/
public class BeatsMessageServerFactory extends NettyEventServerFactory {
/**
* RELP Message Server Factory to receive RELP messages
* @param log Component Log
* @param address Server Address
* @param port Server Port Number
* @param charset Charset to use when decoding RELP messages
* @param events Blocking Queue for events received
*/
public BeatsMessageServerFactory(final ComponentLog log,
final InetAddress address,
final int port,
final Charset charset,
final BlockingQueue<BeatsMessage> events) {
super(address, port, TransportProtocol.TCP);
final BeatsMessageChannelHandler beatsChannelHandler = new BeatsMessageChannelHandler(events, log);
final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
setHandlerSupplier(() -> Arrays.asList(
new BeatsFrameDecoder(log, charset),
beatsChannelHandler,
logExceptionChannelHandler
));
}
}

View File

@ -59,5 +59,4 @@ public class BeatsResponse {
public static BeatsResponse ok(final int seqNumber) { public static BeatsResponse ok(final int seqNumber) {
return new BeatsResponse(seqNumber); return new BeatsResponse(seqNumber);
} }
} }

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TestBeatsEventFactory {
@Test
public void testCreateLumberJackEvent() {
final String sender = "testsender1";
final byte[] data = "this is a test line".getBytes();
final int seqNumber = 1;
final String fields = "{\"file\":\"test\"}";
final Map<String,String> metadata = new HashMap<>();
metadata.put(EventFactory.SENDER_KEY, sender);
metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber));
final ChannelResponder responder = new SocketChannelResponder(null);
final EventFactory<BeatsEvent> factory = new BeatsEventFactory();
final BeatsEvent event = factory.create(data, metadata, responder);
Assert.assertEquals(sender, event.getSender());
Assert.assertEquals(seqNumber, event.getSeqNumber());
Assert.assertEquals(data, event.getData());
}
}

View File

@ -1,157 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.event.BeatsEvent;
import org.apache.nifi.processors.beats.event.BeatsEventFactory;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestBeatsFrameHandler {
private Charset charset;
private EventFactory<BeatsEvent> eventFactory;
private BlockingQueue<BeatsEvent> events;
private SelectionKey key;
private AsyncChannelDispatcher dispatcher;
private BeatsEncoder encoder;
private ComponentLog logger;
private BeatsFrameHandler<BeatsEvent> frameHandler;
@Before
public void setup() {
this.charset = StandardCharsets.UTF_8;
this.eventFactory = new BeatsEventFactory();
this.events = new LinkedBlockingQueue<>();
this.key = Mockito.mock(SelectionKey.class);
this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
this.logger = Mockito.mock(ComponentLog.class);
this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Test
public void testWindow() throws IOException, InterruptedException {
final BeatsFrame openFrame = new BeatsFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x57)
.seqNumber(-1)
.payload(Integer.toString(1).getBytes())
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
}
@Test
public void testJson() throws IOException, InterruptedException {
final byte jsonPayload[] = new byte[]{
// Payload eq { "message": "test-content", "field": "value"}
0x7b, 0x22, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67,
0x65, 0x22, 0x3a, 0x20,
0x22, 0x74, 0x65, 0x73,
0x74, 0x2d, 0x63, 0x6f,
0x6e, 0x74, 0x65, 0x6e,
0x74, 0x22, 0x2c, 0x20,
0x22, 0x66, 0x69, 0x65,
0x6c, 0x64, 0x22, 0x3a,
0x20, 0x22, 0x76, 0x61,
0x6c, 0x75, 0x65, 0x22,
0x7d
};
final BeatsFrame jsonFrame = new BeatsFrame.Builder()
.version((byte) 0x32)
.frameType((byte) 0x4a)
.seqNumber(1)
.dataSize(45)
.payload(jsonPayload)
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(jsonFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
// But events should contain one event
Assert.assertEquals(1, events.size());
final BeatsEvent event = events.poll();
Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", new String(event.getData(), charset));
}
private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
int responded;
List<ChannelResponse> responses = new ArrayList<>();
@Override
public SocketChannel getChannel() {
return Mockito.mock(SocketChannel.class);
}
@Override
public List<ChannelResponse> getResponses() {
return responses;
}
@Override
public void addResponse(ChannelResponse response) {
responses.add(response);
}
@Override
public void respond() throws IOException {
responded++;
}
}
}

View File

@ -1,227 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.beats.event.BeatsMetadata;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestBeatsSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
private SSLContext sslContext;
private Charset charset;
private ChannelDispatcher dispatcher;
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}
@Test
public void testWiredJsonHandling() throws IOException, InterruptedException {
final String singleJsonFrame = "324a000000010000002d7b226d657373616765223a2022746573742d636f6e74656e7422" +
"2c20226669656c64223a202276616c7565227d";
final List<String> messages = new ArrayList<>();
messages.add(singleJsonFrame);
run(messages);
// Check for the 1 frames (from the hex string above) are back...
Assert.assertEquals(1, events.size());
TestEvent event;
while((event = events.poll()) != null) {
Map<String, String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
final String line = new String(event.getData());
Assert.assertTrue(seqNum.equals("1"));
Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", line);
}
}
@Test
public void testCompressedJsonHandling() throws IOException, InterruptedException {
final String multipleJsonFrame = "3243000000E27801CC91414BC3401085477FCA3B6F" +
"93EEB6A5B8A71E3CF5ECC98BECC6491AC86643665290903FAE17A982540F8237E7F" +
"80D3C78EF734722BA21A2B71987C41A9E8306F819FA32303CBADCC020725078D46D" +
"C791836231D0EB7FDB0F933EE9354A2C129A4B44F8B8AF94197D4817CE7CCF67189" +
"CB2E80F74E651DADCC36357D8C2623138689B5834A4011E6E6DF7ABB55DAD770F76" +
"E3B7777EBB299CB58F30903C8D15C3A33CE5C465A8A74ACA2E3792A7B1E25259B4E" +
"87203835CD7C20ABF5FDC91886E89E8F58F237CEEF2EF1A5967BEFBFBD54F8C3162" +
"790F0000FFFF6CB6A08D";
final List<String> messages = new ArrayList<>();
messages.add(multipleJsonFrame);
run(messages);
// Check for the 2 frames (from the hex string above) are back...
Assert.assertEquals(2, events.size());
boolean found1 = false;
boolean found2 = false;
TestEvent event;
while((event = events.poll()) != null) {
Map<String, String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
final String line = new String(event.getData());
if (seqNum.equals("1") && line.contains("\"message\":\"aaaaaa\"")) {
found1 = true;
}
if (seqNum.equals("2") && line.contains("\"message\":\"bbbb\"")) {
found2 = true;
}
}
Assert.assertTrue(found1 && found2);
}
protected void run(List<String> messages) throws IOException, InterruptedException {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);
dispatcherThread.start();
// create a client connection to the port the dispatcher is listening on
final int realPort = dispatcher.getPort();
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", realPort));
Thread.sleep(100);
// send the provided messages
for (int i=0; i < messages.size(); i++) {
buffer.clear();
buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(1);
}
}
// wait up to 10 seconds to verify the responses
long timeout = 10000;
long startTime = System.currentTimeMillis();
while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
Thread.sleep(100);
}
} finally {
// stop the dispatcher thread and ensure we shut down handler threads
dispatcher.close();
}
}
// Test event to produce from the data
private static class TestEvent implements Event<SocketChannel> {
private byte[] data;
private Map<String, String> metadata;
public TestEvent(byte[] data, Map<String, String> metadata) {
this.data = data;
this.metadata = metadata;
}
@Override
public String getSender() {
return metadata.get(EventFactory.SENDER_KEY);
}
@Override
public byte[] getData() {
return data;
}
@Override
public ChannelResponder<SocketChannel> getResponder() {
return null;
}
}
// Factory to create test events and send responses for testing
private static class TestEventHolderFactory implements EventFactory<TestEvent> {
@Override
public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
return new TestEvent(data, metadata);
}
}
}

View File

@ -25,7 +25,6 @@ import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol; import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory; import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.ListenerProperties; import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.relp.event.RELPMessage; import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
@ -196,7 +195,7 @@ public class TestListenRELP {
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents); MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP); runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort())); runner.setProperty(ListenerProperties.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10"); runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
runner.run(); runner.run();
@ -206,7 +205,7 @@ public class TestListenRELP {
private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext) throws Exception { private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext) throws Exception {
final int port = NetworkUtils.availablePort(); final int port = NetworkUtils.availablePort();
runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port)); runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
// Run Processor and start Dispatcher without shutting down // Run Processor and start Dispatcher without shutting down
runner.run(1, false, true); runner.run(1, false, true);
final byte[] relpMessages = getRELPMessages(frames); final byte[] relpMessages = getRELPMessages(frames);