From 7c1a7da1169f66fce490def753f9a0a228a4f75b Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Mon, 31 Oct 2022 22:30:32 -0500 Subject: [PATCH] NIFI-10737 Corrected ListenBeats buffer handling - Added test class for ListenBeats - Removed unnecessary dependencies - Implemented BatchDecoder for reading Beats Protocol frames - Refactored protocol and handler classes Signed-off-by: Nathan Gough This closes #6608. --- .../nifi-beats-processors/pom.xml | 46 +-- .../nifi/processors/beats/ListenBeats.java | 180 ++++----- .../processors/beats/frame/BeatsDecoder.java | 328 --------------- .../processors/beats/frame/BeatsEncoder.java | 47 --- .../processors/beats/frame/BeatsFrame.java | 115 ------ .../handler/BatchChannelInboundHandler.java | 83 ++++ .../beats/handler/BatchDecoder.java | 380 ++++++++++++++++++ .../beats/handler/MessageAckEncoder.java | 65 +++ .../beats/netty/BeatsFrameDecoder.java | 81 ---- .../netty/BeatsMessageChannelHandler.java | 57 --- .../nifi/processors/beats/protocol/Batch.java | 35 ++ .../BatchMessage.java} | 18 +- .../processors/beats/protocol/FrameType.java | 43 ++ .../FrameTypeDecoder.java} | 25 +- .../MessageAck.java} | 20 +- .../ProtocolCode.java} | 15 +- .../ProtocolCodeDecoder.java} | 21 +- .../beats/protocol/ProtocolException.java | 41 ++ .../ProtocolVersion.java} | 23 +- .../protocol/ProtocolVersionDecoder.java | 38 ++ .../beats/response/BeatsChannelResponse.java | 42 -- .../beats/response/BeatsResponse.java | 62 --- .../BeatsMessageServerFactory.java | 25 +- .../processors/beats/ListenBeatsTest.java | 244 +++++++++++ .../beats/frame/TestBeatsEncoder.java | 49 --- .../beats/frame/TestBeatsFrame.java | 39 -- 26 files changed, 1089 insertions(+), 1033 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{netty/BeatsMessage.java => protocol/BatchMessage.java} (66%) create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{netty/BeatsMessageFactory.java => protocol/FrameTypeDecoder.java} (55%) rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{frame/BeatsState.java => protocol/MessageAck.java} (71%) rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{frame/BeatsMetadata.java => protocol/ProtocolCode.java} (75%) rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{frame/BeatsFrameType.java => protocol/ProtocolCodeDecoder.java} (69%) create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{frame/BeatsFrameException.java => protocol/ProtocolVersion.java} (71%) create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java rename nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/{netty => server}/BeatsMessageServerFactory.java (69%) create mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java delete mode 100644 nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml index 91a030f4e1..1aeccb4b81 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml @@ -34,31 +34,18 @@ org.apache.nifi nifi-event-listen 1.19.0-SNAPSHOT - - - org.apache.nifi - nifi-security-socket-ssl - 1.19.0-SNAPSHOT - - - com.google.code.gson - gson - - - org.apache.nifi - nifi-socket-utils - 1.19.0-SNAPSHOT + + + org.apache.nifi + nifi-security-socket-ssl + + org.apache.nifi nifi-utils 1.19.0-SNAPSHOT - - org.apache.nifi - nifi-flowfile-packager - 1.19.0-SNAPSHOT - org.apache.nifi nifi-ssl-context-service-api @@ -71,25 +58,4 @@ test - - - - - jigsaw - - (1.8,) - - - - jakarta.xml.bind - jakarta.xml.bind-api - - - org.glassfish.jaxb - jaxb-runtime - - - - diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java index 86cc2df339..4248c31dcf 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java @@ -16,20 +16,18 @@ */ package org.apache.nifi.processors.beats; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.event.transport.EventException; import org.apache.nifi.event.transport.EventServer; +import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod; +import org.apache.nifi.event.transport.configuration.ShutdownTimeout; import org.apache.nifi.event.transport.netty.NettyEventServerFactory; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -38,14 +36,13 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.listen.EventBatcher; import org.apache.nifi.processor.util.listen.FlowFileEventBatch; import org.apache.nifi.processor.util.listen.ListenerProperties; -import org.apache.nifi.processors.beats.netty.BeatsMessage; -import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory; +import org.apache.nifi.processors.beats.protocol.BatchMessage; +import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory; import org.apache.nifi.remote.io.socket.NetworkUtils; import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.ssl.RestrictedSSLContextService; @@ -55,11 +52,9 @@ import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.InetAddress; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,34 +62,30 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@Tags({"listen", "beats", "tcp", "logs"}) -@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " + - "to the content of a FlowFile." + - "This processor replaces the now deprecated/removed ListenLumberjack") +@Tags({"beats", "logstash", "elasticsearch", "log"}) +@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats protocol and write decoded JSON") @WritesAttributes({ - @WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."), - @WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."), - @WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message. Only included if is 1."), + @WritesAttribute(attribute = "beats.sender", description = "Internet Protocol address of the message sender"), + @WritesAttribute(attribute = "beats.port", description = "TCP port on which the Processor received messages"), + @WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message included for batches containing single messages"), @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json") }) -@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"}) public class ListenBeats extends AbstractProcessor { public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL_CONTEXT_SERVICE") .displayName("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + - "messages will be received over a secure connection.") - // Nearly all Lumberjack v1 implementations require TLS to work. v2 implementations (i.e. beats) have TLS as optional + .description("SSL Context Service is required to enable TLS for socket connections") .required(false) .identifiesControllerService(RestrictedSSLContextService.class) .build(); public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("Client Auth") - .displayName("Client Auth") - .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") + .displayName("Client Authentication") + .description("Client authentication policy when TLS is enabled") .required(false) + .dependsOn(SSL_CONTEXT_SERVICE) .allowableValues(ClientAuth.values()) .defaultValue(ClientAuth.REQUIRED.name()) .build(); @@ -104,73 +95,43 @@ public class ListenBeats extends AbstractProcessor { .description("Messages received successfully will be sent out this relationship.") .build(); - protected List descriptors; - protected Set relationships; + private static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + ListenerProperties.NETWORK_INTF_NAME, + ListenerProperties.PORT, + ListenerProperties.RECV_BUFFER_SIZE, + ListenerProperties.MAX_MESSAGE_QUEUE_SIZE, + ListenerProperties.MAX_SOCKET_BUFFER_SIZE, + ListenerProperties.CHARSET, + ListenerProperties.MAX_BATCH_SIZE, + ListenerProperties.MESSAGE_DELIMITER, + ListenerProperties.WORKER_THREADS, + SSL_CONTEXT_SERVICE, + CLIENT_AUTH + )); + + private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + protected volatile int port; - protected volatile BlockingQueue events; - protected volatile BlockingQueue errorEvents; + protected volatile BlockingQueue events; + protected volatile BlockingQueue errorEvents; protected volatile EventServer eventServer; protected volatile byte[] messageDemarcatorBytes; - protected volatile EventBatcher eventBatcher; - - @Override - protected void init(final ProcessorInitializationContext context) { - final List descriptors = new ArrayList<>(); - descriptors.add(ListenerProperties.NETWORK_INTF_NAME); - descriptors.add(ListenerProperties.PORT); - descriptors.add(ListenerProperties.RECV_BUFFER_SIZE); - descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE); - // Deprecated - descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE); - descriptors.add(ListenerProperties.CHARSET); - descriptors.add(ListenerProperties.MAX_BATCH_SIZE); - descriptors.add(ListenerProperties.MESSAGE_DELIMITER); - descriptors.add(ListenerProperties.WORKER_THREADS); - descriptors.add(SSL_CONTEXT_SERVICE); - descriptors.add(CLIENT_AUTH); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - if (sslContextService != null && !sslContextService.isTrustStoreConfigured()) { - results.add(new ValidationResult.Builder() - .explanation("SSL Context Service requires a truststore for the Beats forwarder client to work correctly") - .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build()); - } - - final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - if (sslContextService != null && StringUtils.isBlank(clientAuth)) { - results.add(new ValidationResult.Builder() - .explanation("Client Auth must be provided when using TLS/SSL") - .valid(false).subject("Client Auth").build()); - } - - return results; - } + protected volatile EventBatcher eventBatcher; @Override public final Set getRelationships() { - return this.relationships; + return RELATIONSHIPS; } @Override public List getSupportedPropertyDescriptors() { - return descriptors; + return DESCRIPTORS; } @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { final int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger(); - final int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface); final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue()); @@ -180,7 +141,7 @@ public class ListenBeats extends AbstractProcessor { final String msgDemarcator = getMessageDemarcator(context); messageDemarcatorBytes = msgDemarcator.getBytes(charset); - final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, charset, events); + final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, events); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); if (sslContextService != null) { @@ -191,80 +152,75 @@ public class ListenBeats extends AbstractProcessor { eventFactory.setClientAuth(clientAuth); } - eventFactory.setSocketReceiveBuffer(bufferSize); + eventFactory.setSocketReceiveBuffer(socketBufferSize); eventFactory.setWorkerThreads(workerThreads); eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier())); + eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); + eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration()); try { eventServer = eventFactory.getEventServer(); - } catch (EventException e) { + } catch (final EventException e) { getLogger().error("Failed to bind to [{}:{}]", address, port, e); } } @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - EventBatcher eventBatcher = getEventBatcher(); + EventBatcher eventBatcher = getEventBatcher(); final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger(); - Map> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes); + Map> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes); processEvents(session, batches); } @OnStopped - public void stopped() { - if (eventServer != null) { + public void shutdown() { + if (eventServer == null) { + getLogger().warn("Event Server not configured"); + } else { eventServer.shutdown(); } eventBatcher = null; } - private void processEvents(final ProcessSession session, final Map> batches) { - for (Map.Entry> entry : batches.entrySet()) { + private void processEvents(final ProcessSession session, final Map> batches) { + for (final Map.Entry> entry : batches.entrySet()) { FlowFile flowFile = entry.getValue().getFlowFile(); - final List events = entry.getValue().getEvents(); + final List events = entry.getValue().getEvents(); if (flowFile.getSize() == 0L || events.size() == 0) { session.remove(flowFile); - getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey()); continue; } final Map attributes = getAttributes(entry.getValue()); flowFile = session.putAllAttributes(flowFile, attributes); - getLogger().debug("Transferring {} to success", flowFile); session.transfer(flowFile, REL_SUCCESS); - session.adjustCounter("FlowFiles Transferred to Success", 1L, false); - // the sender and command will be the same for all events based on the batch key final String transitUri = getTransitUri(entry.getValue()); session.getProvenanceReporter().receive(flowFile, transitUri); - } - session.commitAsync(); } - protected String getTransitUri(FlowFileEventBatch batch) { - final List events = batch.getEvents(); + private String getTransitUri(final FlowFileEventBatch batch) { + final List events = batch.getEvents(); final String sender = events.get(0).getSender(); - final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; - return String.format("beats://%s:%d", senderHost, port); + return String.format("beats://%s:%d", sender, port); } - protected Map getAttributes(FlowFileEventBatch batch) { - final List events = batch.getEvents(); - // the sender and command will be the same for all events based on the batch key + private Map getAttributes(final FlowFileEventBatch batch) { + final List events = batch.getEvents(); + final String sender = events.get(0).getSender(); - final int numAttributes = events.size() == 1 ? 5 : 4; - final Map attributes = new HashMap<>(numAttributes); - attributes.put(beatsAttributes.SENDER.key(), sender); - attributes.put(beatsAttributes.PORT.key(), String.valueOf(port)); + final Map attributes = new LinkedHashMap<>(); + attributes.put(BeatsAttributes.SENDER.key(), sender); + attributes.put(BeatsAttributes.PORT.key(), String.valueOf(port)); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); - // if there was only one event then we can pass on the transaction - // NOTE: we could pass on all the transaction ids joined together + if (events.size() == 1) { - attributes.put(beatsAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber())); + attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(), String.valueOf(events.get(0).getSequenceNumber())); } return attributes; } @@ -275,11 +231,11 @@ public class ListenBeats extends AbstractProcessor { .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); } - private EventBatcher getEventBatcher() { + private EventBatcher getEventBatcher() { if (eventBatcher == null) { - eventBatcher = new EventBatcher(getLogger(), events, errorEvents) { + eventBatcher = new EventBatcher(getLogger(), events, errorEvents) { @Override - protected String getBatchKey(BeatsMessage event) { + protected String getBatchKey(final BatchMessage event) { return event.getSender(); } }; @@ -287,14 +243,14 @@ public class ListenBeats extends AbstractProcessor { return eventBatcher; } - public enum beatsAttributes implements FlowFileAttributeKey { + private enum BeatsAttributes implements FlowFileAttributeKey { SENDER("beats.sender"), PORT("beats.port"), - SEQNUMBER("beats.sequencenumber"); + SEQUENCE_NUMBER("beats.sequencenumber"); private final String key; - beatsAttributes(String key) { + BeatsAttributes(String key) { this.key = key; } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java deleted file mode 100644 index 2fa20ca856..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.frame; - -import org.apache.nifi.logging.ComponentLog; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.LinkedList; -import java.util.List; -import java.util.zip.InflaterInputStream; - -/** - * Decodes a Beats frame by maintaining a state based on each byte that has been processed. This class - * should not be shared by multiple threads. - */ -public class BeatsDecoder { - - - final ComponentLog logger; - - private BeatsFrame.Builder frameBuilder; - private BeatsState currState = BeatsState.VERSION; - private byte decodedFrameType; - - private byte[] unprocessedData; - - private final Charset charset; - private final ByteArrayOutputStream currBytes; - - private long windowSize; - - static final int MIN_FRAME_HEADER_LENGTH = 2; // Version + Type - static final int WINDOWSIZE_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32bit unsigned window size - static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32 bit unsigned + payload - static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit unsigned sequence number + 32 bit unsigned payload length - - /** - * @param charset the charset to decode bytes from the frame - */ - public BeatsDecoder(final Charset charset, final ComponentLog logger) { - this(charset, new ByteArrayOutputStream(4096), logger); - } - - /** - * @param charset the charset to decode bytes from the frame - * @param buffer a buffer to use while processing the bytes - */ - public BeatsDecoder(final Charset charset, final ByteArrayOutputStream buffer, final ComponentLog logger) { - this.logger = logger; - this.charset = charset; - this.currBytes = buffer; - this.frameBuilder = new BeatsFrame.Builder(); - this.decodedFrameType = 0x00; - } - - /** - * Resets this decoder back to its initial state. - */ - public void reset() { - frameBuilder = new BeatsFrame.Builder(); - currState = BeatsState.VERSION; - decodedFrameType = 0x00; - currBytes.reset(); - } - - /** - * Process the next byte from the channel, updating the builder and state accordingly. - * - * @param currByte the next byte to process - * @preturn true if a frame is ready to be retrieved, false otherwise - */ - public boolean process(final byte currByte) throws BeatsFrameException { - try { - switch (currState) { - case VERSION: // Just enough data to process the version - processVERSION(currByte); - break; - case FRAMETYPE: // Also able to process the frametype - processFRAMETYPE(currByte); - break; - case PAYLOAD: // Initial bytes with version and Frame Type have already been received, start iteration over payload - processPAYLOAD(currByte); - - // At one stage, the data sent to processPAYLOAD will be represente a complete frame, so we check before returning true - - if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE && currState == BeatsState.COMPLETE) { - return true; - } else if (frameBuilder.frameType == BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) { - return true; - } else if (frameBuilder.frameType == BeatsFrameType.JSON && currState == BeatsState.COMPLETE) { - return true; - } else { - break; - } - case COMPLETE: - return true; - default: - break; - } - return false; - } catch (Exception e) { - throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e); - } - } - - - /** - * Returns the decoded frame and resets the decoder for the next frame. - * This method should be called after checking isComplete(). - * - * @return the BeatsFrame that was decoded - */ - public List getFrames() throws BeatsFrameException { - List frames = new LinkedList<>(); - - if (currState != BeatsState.COMPLETE) { - throw new BeatsFrameException("Must be at the trailer of a frame"); - } - try { - // Once compressed frames are expanded, they must be devided into individual frames - if (currState == BeatsState.COMPLETE && frameBuilder.frameType == BeatsFrameType.COMPRESSED) { - logger.debug("Frame is compressed, will iterate to decode", new Object[]{}); - - // Zero currBytes, currState and frameBuilder prior to iteration over - // decompressed bytes - currBytes.reset(); - frameBuilder.reset(); - currState = BeatsState.VERSION; - - // Run over decompressed data and split frames - frames = splitCompressedFrames(unprocessedData); - - // In case of V or wired D and J frames we just ship them across the List - } else { - final BeatsFrame frame = frameBuilder.build(); - currBytes.reset(); - frameBuilder.reset(); - currState = BeatsState.VERSION; - frames.add(frame); - } - return frames; - - } catch (Exception e) { - throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e); - } - } - - private List splitCompressedFrames(byte[] decompressedData) { - List frames = new LinkedList<>(); - BeatsFrame.Builder internalFrameBuilder = new BeatsFrame.Builder(); - ByteBuffer currentData = ByteBuffer.wrap(decompressedData); - - // Both Lumberjack v1 and Beats (LJ v2) has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames. - // inside a compressed input. - // Or as stated in the documentation: - // - // "As an example, you could have 3 data frames compressed into a single - // 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}" - // - // Therefore, instead of calling process method again, just iterate over each of - // the frames and split them so they can be processed - - while (currentData.hasRemaining()) { - - int payloadLength = 0; - - internalFrameBuilder.version = currentData.get(); - internalFrameBuilder.frameType = currentData.get(); - switch (internalFrameBuilder.frameType) { - case BeatsFrameType.JSON: - - internalFrameBuilder.seqNumber = (int) (currentData.getInt() & 0x00000000ffffffffL); - currentData.mark(); - - internalFrameBuilder.dataSize = currentData.getInt() & 0x00000000ffffffffL; - currentData.mark(); - - // Define how much data to chomp - payloadLength = Math.toIntExact(internalFrameBuilder.dataSize); - byte[] jsonBytes = new byte[payloadLength]; - - currentData.get(jsonBytes, 0, payloadLength); - currentData.mark(); - - // Add payload to frame - internalFrameBuilder.payload(jsonBytes); - break; - } - - // data frame is created - BeatsFrame frame = internalFrameBuilder.build(); - frames.add(frame); - internalFrameBuilder.reset(); - } - - return frames; - } - - - private void processVERSION(final byte b) { - byte version = b; - frameBuilder.version(version); - logger.debug("Version number is {}", new Object[]{version}); - currBytes.write(b); - currState = BeatsState.FRAMETYPE; - } - - private void processFRAMETYPE(final byte b) { - decodedFrameType = b; - frameBuilder.frameType(decodedFrameType); - logger.debug("Frame type is {}", new Object[]{decodedFrameType}); - currBytes.write(b); - currState = BeatsState.PAYLOAD; - } - - - /** Process the outer PAYLOAD byte by byte. Once data is read state is set to COMPLETE so that the data payload - * can be processed fully using {@link #splitCompressedFrames(byte[])} - * */ - private void processPAYLOAD(final byte b) { - currBytes.write(b); - switch (decodedFrameType) { - case BeatsFrameType.WINDOWSIZE: //'W' - if (currBytes.size() < WINDOWSIZE_LENGTH ) { - logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()}); - break; - } else if (currBytes.size() == WINDOWSIZE_LENGTH) { - frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL; - logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize}); - // Sets payload to empty as frame contains no data - frameBuilder.payload(new byte[]{}); - currBytes.reset(); - currState = BeatsState.COMPLETE; - windowSize = frameBuilder.dataSize; - break; - } else { // Should never be here to be honest... - logger.debug("Saw a packet I should not have seen. Packet contents were {}", new Object[] {currBytes.toString()}); - break; - } - case BeatsFrameType.COMPRESSED: //'C' - if (currBytes.size() < COMPRESSED_MIN_LENGTH) { - if (logger.isTraceEnabled()) { - logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()}); - } - break; - } else if (currBytes.size() >= COMPRESSED_MIN_LENGTH) { - // If data contains more thant the minimum data size - frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL; - if (currBytes.size() - 6 == frameBuilder.dataSize) { - try { - byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size()); - InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf)); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - int len; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - in.close(); - out.close(); - unprocessedData = out.toByteArray(); - // buf is no longer needed - buf = null; - logger.debug("Finished decompressing data"); - // Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them - // as type 'D' frames - frameBuilder.dataSize(unprocessedData.length); - currState = BeatsState.COMPLETE; - - } catch (IOException e) { - throw new BeatsFrameException("Error decompressing frame: " + e.getMessage(), e); - } - - } - break; - // If currentByte.size is not lower than six and also not equal or great than 6... - } else { // Should never be here to be honest... - if (logger.isDebugEnabled()) { - logger.debug("Received a compressed frame with partial data or invalid content. The packet contents were {}", new Object[] {currBytes.toString()}); - } - break; - } - case BeatsFrameType.JSON: // 'J́' - // Because Beats can disable compression, sometimes, JSON data will be received outside a compressed - // stream (i.e. 0x43). Instead of processing it here, we defer its processing to went getFrames is - // called - if (currBytes.size() < JSON_MIN_LENGTH) { - if (logger.isTraceEnabled()) { - logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()}); - } - break; - } else if (currBytes.size() == JSON_MIN_LENGTH) { - // Read the sequence number from bytes - frameBuilder.seqNumber = (int) (ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL); - // Read the JSON payload length - frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, 10)).getInt() & 0x00000000ffffffffL; - } else if (currBytes.size() > JSON_MIN_LENGTH) { - // Wait for payload to be fully read and then complete processing - if (currBytes.size() - 10 == frameBuilder.dataSize) { - // Transfer the current payload so it can be processed by {@link #splitCompressedFrames} method. - frameBuilder.payload = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 10, currBytes.size()); - currState = BeatsState.COMPLETE; - } - break; - } - } - } - -} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java deleted file mode 100644 index 8463d48ed2..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.frame; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * Encodes a BeatsFrame into raw bytes using the given charset. - */ -public class BeatsEncoder { - - - public byte[] encode(final BeatsFrame frame) { - final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - - // Writes the version - buffer.write(frame.getVersion()); - - // Writes the frameType - buffer.write(frame.getFrameType()); - - // Writes the sequence number - try { - buffer.write(frame.getPayload()); - } catch (IOException e) { - throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e); - } - - return buffer.toByteArray(); - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java deleted file mode 100644 index ccb3bba366..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.frame; - - -/** - * A frame received from a channel. - */ -public class BeatsFrame { - - public static final byte DELIMITER = 10; - - private final byte version; - private final byte frameType; - private final byte[] payload; - private final long dataSize; - private final long seqNumber; - - private BeatsFrame(final Builder builder) { - this.version = builder.version; - this.frameType = builder.frameType; - this.payload = builder.payload; - this.dataSize = builder.dataSize; - this.seqNumber = builder.seqNumber; - - if (version < 2 || payload.length < 0 ) { - throw new BeatsFrameException("Invalid Frame"); - } - } - - public long getSeqNumber() { - return seqNumber; - } - - public byte getVersion() { - return version; - } - - public byte getFrameType() { - return frameType; - } - - public byte [] getPayload() { - return payload; - } - - /** - * Builder for a BeatsFrame. - */ - public static class Builder { - - byte version; - byte frameType; - byte [] payload; - long dataSize; - int seqNumber; - - public Builder() { - reset(); - } - - public void reset() { - version = -1; - seqNumber = -1; - frameType = -1; - payload = null; - } - - public Builder version(final byte version) { - this.version = version; - return this; - } - - public Builder seqNumber(final int seqNumber) { - this.seqNumber = seqNumber; - return this; - } - - public Builder frameType(final byte frameType) { - this.frameType = frameType; - return this; - } - - public Builder dataSize(final long dataSize) { - this.dataSize = dataSize; - return this; - } - - public Builder payload(final byte [] payload) { - this.payload = payload; - return this; - } - - - public BeatsFrame build() { - return new BeatsFrame(this); - } - - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java new file mode 100644 index 0000000000..c620456431 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.handler; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.beats.protocol.Batch; +import org.apache.nifi.processors.beats.protocol.BatchMessage; +import org.apache.nifi.processors.beats.protocol.MessageAck; + +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +/** + * Batch Channel Inbound Handler processes a batch of messages and sends an acknowledgement for the last sequence number + */ +@ChannelHandler.Sharable +public class BatchChannelInboundHandler extends SimpleChannelInboundHandler { + private final ComponentLog log; + + private final BlockingQueue messages; + + /** + * Batch Channel Inbound Handler with required arguments + * + * @param log Processor Log + * @param messages Queue of messages + */ + public BatchChannelInboundHandler(final ComponentLog log, final BlockingQueue messages) { + this.log = Objects.requireNonNull(log, "Component Log required"); + this.messages = Objects.requireNonNull(messages, "Message Queue required"); + } + + /** + * Channel Read processes a batch of messages and sends an acknowledgement for the last sequence number + * + * @param context Channel Handler Context + * @param batch Batch of messages + */ + @Override + protected void channelRead0(final ChannelHandlerContext context, final Batch batch) { + Integer lastSequenceNumber = null; + + final Collection batchMessages = batch.getMessages(); + int queued = 0; + for (final BatchMessage batchMessage : batchMessages) { + final int sequenceNumber = batchMessage.getSequenceNumber(); + final String sender = batchMessage.getSender(); + if (messages.offer(batchMessage)) { + log.debug("Message Sequence Number [{}] Sender [{}] queued", sequenceNumber, sender); + lastSequenceNumber = batchMessage.getSequenceNumber(); + queued++; + } else { + log.warn("Message Sequence Number [{}] Sender [{}] queuing failed: Queued [{}] of [{}]", sequenceNumber, sender, queued, batchMessages.size()); + break; + } + } + + if (lastSequenceNumber == null) { + log.warn("Batch Messages [{}] queuing failed", batch.getMessages().size()); + } else { + final MessageAck messageAck = new MessageAck(lastSequenceNumber); + context.writeAndFlush(messageAck); + } + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java new file mode 100644 index 0000000000..4aec512c22 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.beats.protocol.Batch; +import org.apache.nifi.processors.beats.protocol.BatchMessage; +import org.apache.nifi.processors.beats.protocol.FrameType; +import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder; +import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder; +import org.apache.nifi.processors.beats.protocol.ProtocolException; +import org.apache.nifi.processors.beats.protocol.ProtocolVersion; +import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.Inflater; +import java.util.zip.InflaterOutputStream; + +/** + * Byte Buffer to Batch Decoder parses bytes to batches of Beats messages + */ +public class BatchDecoder extends ByteToMessageDecoder { + private static final int INITIAL_WINDOW_SIZE = 1; + + private static final int INITIAL_QUEUE_SIZE = 1; + + private static final int CODE_READABLE_BYTES = 1; + + private static final int INT_READABLE_BYTES = 4; + + private static final ProtocolCodeDecoder VERSION_DECODER = new ProtocolVersionDecoder(); + + private static final ProtocolCodeDecoder FRAME_TYPE_DECODER = new FrameTypeDecoder(); + + private final ComponentLog log; + + private final AtomicReference versionRef = new AtomicReference<>(); + + private final AtomicReference frameTypeRef = new AtomicReference<>(); + + private final AtomicInteger windowSize = new AtomicInteger(INITIAL_WINDOW_SIZE); + + private final AtomicReference sequenceNumberRef = new AtomicReference<>(); + + private final AtomicReference payloadSizeRef = new AtomicReference<>(); + + private final AtomicReference compressedSizeRef = new AtomicReference<>(); + + private Queue batchMessages = new ArrayBlockingQueue<>(INITIAL_QUEUE_SIZE); + + /** + * Beats Batch Decoder with required arguments + * + * @param log Processor Log + */ + public BatchDecoder(final ComponentLog log) { + this.log = Objects.requireNonNull(log, "Component Log required"); + } + + /** + * Decode Batch of Beats Messages from Byte Buffer + * + * @param context Channel Handler Context + * @param buffer Byte Buffer + * @param objects List of Batch objects + */ + @Override + protected void decode(final ChannelHandlerContext context, final ByteBuf buffer, final List objects) { + final ProtocolVersion protocolVersion = readVersion(buffer); + if (ProtocolVersion.VERSION_2 == protocolVersion) { + final FrameType frameType = readFrameType(buffer); + decodeFrameType(frameType, context, buffer, objects); + } else if (ProtocolVersion.VERSION_1 == protocolVersion) { + throw new ProtocolException("Protocol Version [1] not supported"); + } + } + + private void decodeFrameType(final FrameType frameType, final ChannelHandlerContext context, final ByteBuf buffer, final List batches) { + if (frameType == null) { + log.trace("Frame Type not found"); + } else if (FrameType.COMPRESSED == frameType) { + processCompressed(context, buffer, batches); + } else if (FrameType.WINDOW_SIZE == frameType) { + processWindowSize(context, buffer); + } else if (FrameType.JSON == frameType) { + processJson(context, buffer, batches); + } else { + final String message = String.format("Frame Type [%s] not supported", frameType); + throw new ProtocolException(message); + } + } + + private void processWindowSize(final ChannelHandlerContext context, final ByteBuf buffer) { + final Integer readWindowSize = readUnsignedInteger(buffer); + if (readWindowSize == null) { + log.trace("State [Read Window Size] not enough readable bytes"); + } else { + windowSize.getAndSet(readWindowSize); + batchMessages = new ArrayBlockingQueue<>(readWindowSize); + + resetFrameTypeVersion(); + final Channel channel = context.channel(); + log.debug("Processed Window Size [{}] Local [{}] Remote [{}]", readWindowSize, channel.localAddress(), channel.remoteAddress()); + } + } + + private void processCompressed(final ChannelHandlerContext context, final ByteBuf buffer, final List batches) { + final Integer readCompressedSize = readCompressedSize(buffer); + if (readCompressedSize == null) { + log.trace("State [Read Compressed] not enough readable bytes"); + } else { + final int readableBytes = buffer.readableBytes(); + if (readableBytes >= readCompressedSize) { + final Channel channel = context.channel(); + log.debug("Processing Compressed Size [{}] Local [{}] Remote [{}]", readCompressedSize, channel.localAddress(), channel.remoteAddress()); + + processCompressed(context, buffer, readCompressedSize, batches); + } else { + log.trace("State [Read Compressed] not enough readable bytes [{}] for compressed [{}]", readableBytes, readCompressedSize); + } + } + } + + private void processCompressed( + final ChannelHandlerContext context, + final ByteBuf buffer, + final int compressedSize, + final List batches + ) { + final ByteBuf inflated = context.alloc().buffer(compressedSize); + try { + readCompressedBuffer(buffer, inflated, compressedSize); + + // Clear status prior to decoding inflated frames + resetSequenceVersionPayloadSize(); + resetFrameTypeVersion(); + + while (inflated.isReadable()) { + decode(context, inflated, batches); + } + } finally { + compressedSizeRef.set(null); + inflated.release(); + } + } + + private void processJson(final ChannelHandlerContext context, final ByteBuf buffer, final List batches) { + final Channel channel = context.channel(); + + final Integer sequenceNumber = readSequenceNumber(buffer); + if (sequenceNumber == null) { + log.trace("State [Read JSON] Sequence Number not found Remote [{}]", channel.remoteAddress()); + } else { + final Integer payloadSize = readPayloadSize(buffer); + if (payloadSize == null) { + log.trace("State [Read JSON] Payload Size not found Remote [{}]", channel.remoteAddress()); + } else { + processJson(sequenceNumber, payloadSize, context, buffer, batches); + } + } + } + + private void processJson( + final int sequenceNumber, + final int payloadSize, + final ChannelHandlerContext context, + final ByteBuf buffer, + final List batches + ) { + final Channel channel = context.channel(); + + final BatchMessage batchMessage = readJsonMessage(context, sequenceNumber, payloadSize, buffer); + if (batchMessage == null) { + log.trace("State [Read JSON] Message not found Remote [{}]", channel.remoteAddress()); + } else { + processBatchMessage(batchMessage, batches); + log.debug("Processed JSON Message Sequence Number [{}] Payload Size [{}] Local [{}] Remote [{}]", sequenceNumber, payloadSize, channel.localAddress(), channel.remoteAddress()); + } + } + + private BatchMessage readJsonMessage( + final ChannelHandlerContext context, + final int sequenceNumber, + final int payloadSize, + final ByteBuf buffer + ) { + final BatchMessage batchMessage; + + final int readableBytes = buffer.readableBytes(); + if (readableBytes >= payloadSize) { + final byte[] payload = new byte[payloadSize]; + buffer.readBytes(payload); + + final Channel channel = context.channel(); + final String sender = getRemoteHostAddress(channel); + batchMessage = new BatchMessage(sender, payload, sequenceNumber); + } else { + batchMessage = null; + log.trace("State [Read JSON] Sequence Number [{}] not enough readable bytes [{}] for payload [{}]", sequenceNumber, readableBytes, payloadSize); + } + + return batchMessage; + } + + private String getRemoteHostAddress(final Channel channel) { + final String remoteHostAddress; + + final SocketAddress remoteAddress = channel.remoteAddress(); + if (remoteAddress instanceof InetSocketAddress) { + final InetSocketAddress remoteSocketAddress = (InetSocketAddress) remoteAddress; + final InetAddress address = remoteSocketAddress.getAddress(); + remoteHostAddress = address.getHostAddress(); + } else { + remoteHostAddress = remoteAddress.toString(); + } + + return remoteHostAddress; + } + + private void processBatchMessage(final BatchMessage batchMessage, final List batches) { + if (batchMessages.offer(batchMessage)) { + resetSequenceVersionPayloadSize(); + resetFrameTypeVersion(); + + if (windowSize.get() == batchMessages.size()) { + final Collection messages = new ArrayList<>(batchMessages); + final Batch batch = new Batch(messages); + batches.add(batch); + + resetWindowSize(); + } + } else { + final String message = String.format("Received message exceeds Window Size [%d]", windowSize.get()); + throw new ProtocolException(message); + } + } + + private void readCompressedBuffer(final ByteBuf compressedBuffer, final ByteBuf inflated, final int compressedSize) { + final Inflater inflater = new Inflater(); + try ( + final ByteBufOutputStream outputStream = new ByteBufOutputStream(inflated); + final InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(outputStream, inflater) + ) { + compressedBuffer.readBytes(inflaterOutputStream, compressedSize); + } catch (final IOException e) { + final String message = String.format("Read Compressed Payload Size [%d] failed", compressedSize); + throw new ProtocolException(message, e); + } finally { + inflater.end(); + } + } + + private Integer readSequenceNumber(final ByteBuf buffer) { + if (sequenceNumberRef.get() == null) { + final Integer readSequenceNumber = readUnsignedInteger(buffer); + if (readSequenceNumber == null) { + log.trace("State [Read JSON] not enough readable bytes for Sequence Number"); + } else { + sequenceNumberRef.set(readSequenceNumber); + } + } + + return sequenceNumberRef.get(); + } + + private Integer readPayloadSize(final ByteBuf buffer) { + if (payloadSizeRef.get() == null) { + final Integer readPayloadSize = readUnsignedInteger(buffer); + if (readPayloadSize == null) { + log.trace("State [Read JSON] not enough readable bytes for Payload Size"); + } else { + payloadSizeRef.set(readPayloadSize); + } + } + + return payloadSizeRef.get(); + } + + private Integer readCompressedSize(final ByteBuf buffer) { + if (compressedSizeRef.get() == null) { + final Integer readCompressedSize = readUnsignedInteger(buffer); + if (readCompressedSize == null) { + log.trace("State [Read Compressed] not enough readable bytes for Compressed Size"); + } else { + compressedSizeRef.set(readCompressedSize); + } + } + + return compressedSizeRef.get(); + } + + private Integer readUnsignedInteger(final ByteBuf buffer) { + final Integer number; + + final int readableBytes = buffer.readableBytes(); + if (readableBytes >= INT_READABLE_BYTES) { + final long unsigned = buffer.readUnsignedInt(); + number = Math.toIntExact(unsigned); + } else { + number = null; + } + + return number; + } + + private FrameType readFrameType(final ByteBuf buffer) { + if (frameTypeRef.get() == null) { + final int readableBytes = buffer.readableBytes(); + if (readableBytes >= CODE_READABLE_BYTES) { + final byte frameTypeCode = buffer.readByte(); + final FrameType frameType = FRAME_TYPE_DECODER.readProtocolCode(frameTypeCode); + frameTypeRef.set(frameType); + } else { + log.trace("State [Read Frame Type] not enough readable bytes [{}]", readableBytes); + } + } + + return frameTypeRef.get(); + } + + private ProtocolVersion readVersion(final ByteBuf buffer) { + if (versionRef.get() == null) { + final int readableBytes = buffer.readableBytes(); + if (readableBytes >= CODE_READABLE_BYTES) { + final byte versionCode = buffer.readByte(); + final ProtocolVersion protocolVersion = VERSION_DECODER.readProtocolCode(versionCode); + versionRef.set(protocolVersion); + } else { + log.trace("State [Read Version] not enough readable bytes [{}]", readableBytes); + } + } + + return versionRef.get(); + } + + private void resetSequenceVersionPayloadSize() { + sequenceNumberRef.set(null); + payloadSizeRef.set(null); + } + + private void resetFrameTypeVersion() { + frameTypeRef.set(null); + versionRef.set(null); + } + + private void resetWindowSize() { + windowSize.set(INITIAL_WINDOW_SIZE); + batchMessages.clear(); + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java new file mode 100644 index 0000000000..f455207791 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.beats.protocol.FrameType; +import org.apache.nifi.processors.beats.protocol.MessageAck; +import org.apache.nifi.processors.beats.protocol.ProtocolVersion; + +import java.util.Objects; + +/** + * Beats Message Acknowledgement Encoder writes Protocol Version 2 ACK packets with a specified sequence number + */ +@ChannelHandler.Sharable +public class MessageAckEncoder extends MessageToByteEncoder { + private final ComponentLog log; + + /** + * Message Acknowledgment Encoder with required arguments + * + * @param log Processor Log + */ + public MessageAckEncoder(final ComponentLog log) { + this.log = Objects.requireNonNull(log, "Component Log required"); + } + + /** + * Encode Message Acknowledgement to the buffer with Protocol Version 2 and ACK Frame Type + * + * @param context Channel Handler Context + * @param messageAck Message Acknowledgement containing Sequence Number + * @param buffer Byte Buffer + */ + @Override + protected void encode(final ChannelHandlerContext context, final MessageAck messageAck, final ByteBuf buffer) { + buffer.writeByte(ProtocolVersion.VERSION_2.getCode()); + buffer.writeByte(FrameType.ACK.getCode()); + + final int sequenceNumber = messageAck.getSequenceNumber(); + buffer.writeInt(sequenceNumber); + + final Channel channel = context.channel(); + log.debug("Encoded Message Ack Sequence Number [{}] Local [{}] Remote [{}]", sequenceNumber, channel.localAddress(), channel.remoteAddress()); + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java deleted file mode 100644 index e870660302..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processors.beats.frame.BeatsDecoder; -import org.apache.nifi.processors.beats.frame.BeatsFrame; -import org.apache.nifi.processors.beats.frame.BeatsFrameType; -import org.apache.nifi.processors.beats.frame.BeatsMetadata; - -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; - -/** - * Decode a Beats message's bytes into a BeatsMessage object - */ -public class BeatsFrameDecoder extends ByteToMessageDecoder { - - private final Charset charset; - private final ComponentLog logger; - private final BeatsMessageFactory messageFactory; - - public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) { - this.charset = charset; - this.logger = logger; - this.messageFactory = new BeatsMessageFactory(); - } - - @Override - protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out) { - final int total = in.readableBytes(); - final String senderSocket = ctx.channel().remoteAddress().toString(); - final BeatsDecoder decoder = new BeatsDecoder(charset, logger); - - for (int i = 0; i < total; i++) { - byte currByte = in.readByte(); - - // decode the bytes and once we find the end of a frame, handle the frame - if (decoder.process(currByte)) { - final List frames = decoder.getFrames(); - for (BeatsFrame frame : frames) { - logger.debug("Received Beats Frame Sender [{}] Transaction [{}] Frame Type [{}]", - senderSocket, frame.getSeqNumber(), frame.getFrameType()); - // Ignore the WINDOW SIZE type frames as they contain no payload. - if (frame.getFrameType() != BeatsFrameType.WINDOWSIZE) { - handle(frame, senderSocket, out); - } - } - } - } - } - - private void handle(final BeatsFrame frame, final String sender, final List out) { - final Map metadata = EventFactoryUtil.createMapWithSender(sender); - metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber())); - - if (frame.getFrameType() == BeatsFrameType.JSON) { - final BeatsMessage event = messageFactory.create(frame.getPayload(), metadata); - out.add(event); - } - } -} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java deleted file mode 100644 index 0518a12545..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.netty; - -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.beats.frame.BeatsEncoder; -import org.apache.nifi.processors.beats.response.BeatsChannelResponse; -import org.apache.nifi.processors.beats.response.BeatsResponse; - -import java.util.concurrent.BlockingQueue; - -/** - * Decode data received into a BeatsMessage - */ -@ChannelHandler.Sharable -public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler { - - private final ComponentLog componentLog; - private final BlockingQueue events; - private final BeatsEncoder encoder; - - public BeatsMessageChannelHandler(BlockingQueue events, ComponentLog componentLog) { - this.events = events; - this.componentLog = componentLog; - this.encoder = new BeatsEncoder(); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) { - componentLog.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender()); - if (events.offer(msg)) { - componentLog.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); - BeatsChannelResponse successResponse = new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())); - ctx.writeAndFlush(Unpooled.wrappedBuffer(successResponse.toByteArray())); - } else { - componentLog.warn("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); - } - } -} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java new file mode 100644 index 0000000000..1f24405bfb --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.protocol; + +import java.util.Collection; +import java.util.Objects; + +/** + * Batch of Beats Messages + */ +public class Batch { + private final Collection messages; + + public Batch(final Collection messages) { + this.messages = Objects.requireNonNull(messages, "Message required"); + } + + public Collection getMessages() { + return messages; + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java similarity index 66% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java index 70f9d4d5a2..3b1c5bb9cc 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java @@ -14,23 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.netty; +package org.apache.nifi.processors.beats.protocol; import org.apache.nifi.event.transport.message.ByteArrayMessage; /** - * A Beats message which adds a sequence number to the ByteArrayMessage. + * Beats Batch Message containing JSON payload and sequence number */ -public class BeatsMessage extends ByteArrayMessage { +public class BatchMessage extends ByteArrayMessage { - private final int seqNumber; + private final int sequenceNumber; - public BeatsMessage(final String sender, final byte[] data, final int seqNumber) { - super(data, sender); - this.seqNumber = seqNumber; + public BatchMessage(final String sender, final byte[] payload, final int sequenceNumber) { + super(payload, sender); + this.sequenceNumber = sequenceNumber; } - public int getSeqNumber() { - return seqNumber; + public int getSequenceNumber() { + return sequenceNumber; } } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java new file mode 100644 index 0000000000..640cd8c364 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.protocol; + +/** + * Beats Protocol Frame Type + */ +public enum FrameType implements ProtocolCode { + ACK('A'), + + COMPRESSED('C'), + + DATA('D'), + + JSON('J'), + + WINDOW_SIZE('W'); + + private final int code; + + FrameType(final char code) { + this.code = code; + } + + @Override + public int getCode() { + return code; + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java similarity index 55% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java index 73be08f7cc..3f02d9ab3d 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java @@ -14,22 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.netty; +package org.apache.nifi.processors.beats.protocol; -import org.apache.nifi.processor.util.listen.event.NetworkEventFactory; -import org.apache.nifi.processors.beats.frame.BeatsMetadata; - -import java.util.Map; +import java.util.Arrays; +import java.util.Optional; /** - * An EventFactory implementation to create BeatsMessages. + * Beats Frame Type Decoder */ -public class BeatsMessageFactory implements NetworkEventFactory { +public class FrameTypeDecoder implements ProtocolCodeDecoder { @Override - public BeatsMessage create(final byte[] data, final Map metadata) { - final int sequenceNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY)); - final String sender = metadata.get(BeatsMetadata.SENDER_KEY); - return new BeatsMessage(sender, data, sequenceNumber); + public FrameType readProtocolCode(final byte code) { + final Optional frameTypeFound = Arrays.stream(FrameType.values()).filter( + frameType -> frameType.getCode() == code + ).findFirst(); + + return frameTypeFound.orElseThrow(() -> { + final String message = String.format("Frame Type Code [%d] not supported", code); + return new ProtocolException(message); + }); } } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java similarity index 71% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java index b18cf8537b..2778655226 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java @@ -14,15 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.frame; +package org.apache.nifi.processors.beats.protocol; /** - * The stages of parsing of a Beats conversation. + * Beats Message Acknowledgement */ -public enum BeatsState { +public class MessageAck { + private final int sequenceNumber; - VERSION, // First stage is parsing the version - FRAMETYPE, // Second stage is to be able to read the frame type - PAYLOAD, // payload being populated - COMPLETE // complete packet handling -} \ No newline at end of file + public MessageAck(final int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public int getSequenceNumber() { + return sequenceNumber; + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java similarity index 75% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java index 2dc5a74d89..7112f5ccc1 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java @@ -14,13 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.frame; +package org.apache.nifi.processors.beats.protocol; /** - * Metadata keys for Beats message. + * Beats Protocol Code interface abstraction */ -public interface BeatsMetadata { - - String SEQNUMBER_KEY = "beats.sequencenumber"; - String SENDER_KEY = "sender"; +public interface ProtocolCode { + /** + * Get Protocol Code as transmitted over a socket connection + * + * @return Protocol Code + */ + int getCode(); } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java similarity index 69% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java index 77a32728c4..a853dffdce 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java @@ -14,12 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.frame; +package org.apache.nifi.processors.beats.protocol; -public final class BeatsFrameType { - public static final byte WINDOWSIZE = 0x57; - public static final byte DATA = 0x44; - public static final byte COMPRESSED = 0x43; - public static final byte ACK = 0x41; - public static final byte JSON = 0x4a; +/** + * Decoder for Protocol Code byte values + * + * @param Protocol Code Type + */ +public interface ProtocolCodeDecoder { + /** + * Read Protocol Code + * + * @param code Code byte value + * @return Protocol Code + */ + T readProtocolCode(byte code); } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java new file mode 100644 index 0000000000..45d7c9cc63 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.protocol; + +/** + * Beats Protocol Exception + */ +public class ProtocolException extends RuntimeException { + /** + * Protocol Exception constructor with message containing protocol failure details + * + * @param message Protocol failure details + */ + public ProtocolException(final String message) { + super(message); + } + + /** + * Protocol Exception constructor with message and cause of failure details + * + * @param message Protocol failure details + * @param cause Cause of failure + */ + public ProtocolException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java similarity index 71% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java index baa34a2350..fc7972e725 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java @@ -14,19 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.frame; +package org.apache.nifi.processors.beats.protocol; /** - * Represents an error encountered when decoding frames. + * Beats Protocol Version */ -public class BeatsFrameException extends RuntimeException { +public enum ProtocolVersion implements ProtocolCode { + VERSION_1('1'), - public BeatsFrameException(String message) { - super(message); + VERSION_2('2'); + + private final int code; + + ProtocolVersion(final char code) { + this.code = code; } - public BeatsFrameException(String message, Throwable cause) { - super(message, cause); + @Override + public int getCode() { + return code; } - -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java new file mode 100644 index 0000000000..3f06d382e5 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.protocol; + +import java.util.Arrays; +import java.util.Optional; + +/** + * Beats Protocol Version Decoder + */ +public class ProtocolVersionDecoder implements ProtocolCodeDecoder { + + @Override + public ProtocolVersion readProtocolCode(final byte code) { + final Optional protocolVersionFound = Arrays.stream(ProtocolVersion.values()).filter( + protocolVersion -> protocolVersion.getCode() == code + ).findFirst(); + + return protocolVersionFound.orElseThrow(() -> { + final String message = String.format("Version Code [%d] not supported", code); + return new ProtocolException(message); + }); + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java deleted file mode 100644 index 5890771ab0..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.response; - -import org.apache.nifi.processor.util.listen.response.ChannelResponse; -import org.apache.nifi.processors.beats.frame.BeatsFrame; -import org.apache.nifi.processors.beats.frame.BeatsEncoder; - -/** - * Creates a BeatsFrame for the provided response and returns the encoded frame. - */ -public class BeatsChannelResponse implements ChannelResponse { - - private final BeatsEncoder encoder; - private final BeatsResponse response; - - public BeatsChannelResponse(final BeatsEncoder encoder, final BeatsResponse response) { - this.encoder = encoder; - this.response = response; - } - - @Override - public byte[] toByteArray() { - final BeatsFrame frame = response.toFrame(); - return encoder.encode(frame); - } - -} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java deleted file mode 100644 index 7f12c9d7d1..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.response; - -import org.apache.nifi.processors.beats.frame.BeatsFrame; - -import java.nio.ByteBuffer; - -/** - 'ack' frame type - - SENT FROM READER ONLY - frame type value: ASCII 'A' aka byte value 0x41 - - Payload: - 32bit unsigned sequence number. - - */ -public class BeatsResponse { - private final int seqNumber; - final private byte version = 0x32; // v2 - final private byte frameType = 0x41; // A or ACK - - - - public BeatsResponse(final int seqNumber) { - this.seqNumber = seqNumber; - } - - /** - * Creates a BeatsFrame where the data portion will contain this response. - * - * - * @return a BeatsFrame for for this response - */ - public BeatsFrame toFrame() { - - return new BeatsFrame.Builder() - .version(version) - .frameType(frameType) - .payload(ByteBuffer.allocate(4).putInt(seqNumber).array()) - .build(); - } - - public static BeatsResponse ok(final int seqNumber) { - return new BeatsResponse(seqNumber); - } -} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java similarity index 69% rename from nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java rename to nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java index 2e0414189d..35aa3c1547 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java @@ -14,43 +14,46 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.netty; +package org.apache.nifi.processors.beats.server; import org.apache.nifi.event.transport.configuration.TransportProtocol; import org.apache.nifi.event.transport.netty.NettyEventServerFactory; import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.beats.handler.BatchChannelInboundHandler; +import org.apache.nifi.processors.beats.handler.BatchDecoder; +import org.apache.nifi.processors.beats.handler.MessageAckEncoder; +import org.apache.nifi.processors.beats.protocol.BatchMessage; import java.net.InetAddress; -import java.nio.charset.Charset; import java.util.Arrays; import java.util.concurrent.BlockingQueue; /** - * Netty Event Server Factory implementation for RELP Messages + * Beats Message Protocol extends of Netty Event Server Factory */ public class BeatsMessageServerFactory extends NettyEventServerFactory { - /** - * RELP Message Server Factory to receive RELP messages + * Beats Message Server Factory constructor with standard configuration arguments + * * @param log Component Log * @param address Server Address * @param port Server Port Number - * @param charset Charset to use when decoding RELP messages * @param events Blocking Queue for events received */ public BeatsMessageServerFactory(final ComponentLog log, final InetAddress address, final int port, - final Charset charset, - final BlockingQueue events) { + final BlockingQueue events) { super(address, port, TransportProtocol.TCP); - final BeatsMessageChannelHandler beatsChannelHandler = new BeatsMessageChannelHandler(events, log); + final MessageAckEncoder messageAckEncoder = new MessageAckEncoder(log); + final BatchChannelInboundHandler batchChannelInboundHandler = new BatchChannelInboundHandler(log, events); final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log); setHandlerSupplier(() -> Arrays.asList( - new BeatsFrameDecoder(log, charset), - beatsChannelHandler, + messageAckEncoder, + new BatchDecoder(log), + batchChannelInboundHandler, logExceptionChannelHandler )); } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java new file mode 100644 index 0000000000..9b74a19e4f --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats; + +import org.apache.nifi.processor.util.listen.ListenerProperties; +import org.apache.nifi.processors.beats.protocol.FrameType; +import org.apache.nifi.processors.beats.protocol.ProtocolVersion; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Optional; +import java.util.zip.DeflaterOutputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ListenBeatsTest { + + private static final String LOCALHOST = "127.0.0.1"; + + private static final String LOCALHOST_TRANSIT_URI = "beats://127.0.0.1:%d"; + + private static final int ACK_PACKET_LENGTH = 6; + + private static final int FIRST_SEQUENCE_NUMBER = 1; + + private static final int INTEGER_BUFFER_SIZE = 4; + + private static final String JSON_PAYLOAD = "{\"@timestamp\":\"2022-10-31T12:30:45.678Z\",\"message\":\"Processing Started\"}"; + + private static final int WINDOWED_MESSAGES = 50; + + TestRunner runner; + + @BeforeEach + void setRunner() { + runner = TestRunners.newTestRunner(ListenBeats.class); + } + + @Timeout(10) + @Test + void testRunSingleJsonMessage() throws Exception { + final int port = NetworkUtils.getAvailableTcpPort(); + runner.setProperty(ListenerProperties.PORT, Integer.toString(port)); + + startServer(); + + try ( + final Socket socket = new Socket(LOCALHOST, port); + final InputStream inputStream = socket.getInputStream(); + final OutputStream outputStream = socket.getOutputStream() + ) { + sendMessage(outputStream, FIRST_SEQUENCE_NUMBER); + assertAckPacketMatched(inputStream, FIRST_SEQUENCE_NUMBER); + } + + assertFlowFilesSuccess(1); + assertReceiveEventFound(port); + } + + @Timeout(10) + @Test + void testRunWindowSizeJsonMessages() throws Exception { + final int port = NetworkUtils.getAvailableTcpPort(); + runner.setProperty(ListenerProperties.PORT, Integer.toString(port)); + + startServer(); + + try ( + final Socket socket = new Socket(LOCALHOST, port); + final InputStream inputStream = socket.getInputStream(); + final OutputStream outputStream = socket.getOutputStream() + ) { + sendWindowSize(outputStream); + + for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= WINDOWED_MESSAGES; sequenceNumber++) { + sendMessage(outputStream, sequenceNumber); + } + + assertAckPacketMatched(inputStream, WINDOWED_MESSAGES); + } + + assertFlowFilesSuccess(WINDOWED_MESSAGES); + assertReceiveEventFound(port); + } + + @Timeout(10) + @Test + void testRunWindowSizeCompressedJsonMessages() throws Exception { + final int port = NetworkUtils.getAvailableTcpPort(); + runner.setProperty(ListenerProperties.PORT, Integer.toString(port)); + + startServer(); + + try ( + final Socket socket = new Socket(LOCALHOST, port); + final InputStream inputStream = socket.getInputStream(); + final OutputStream outputStream = socket.getOutputStream() + ) { + sendWindowSize(outputStream); + + final ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream(); + final DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(compressedOutputStream); + + for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= WINDOWED_MESSAGES; sequenceNumber++) { + sendMessage(deflaterOutputStream, sequenceNumber); + } + + deflaterOutputStream.close(); + final byte[] compressed = compressedOutputStream.toByteArray(); + sendCompressed(outputStream, compressed); + + assertAckPacketMatched(inputStream, WINDOWED_MESSAGES); + } + + assertFlowFilesSuccess(WINDOWED_MESSAGES); + assertReceiveEventFound(port); + } + + private void startServer() { + runner.run(1, false, true); + } + + private void assertReceiveEventFound(final int port) { + final Optional receiveRecord = runner.getProvenanceEvents().stream().filter(record -> + ProvenanceEventType.RECEIVE == record.getEventType() + ).findFirst(); + + assertTrue(receiveRecord.isPresent()); + final ProvenanceEventRecord record = receiveRecord.get(); + + final String expectedTransitUri = String.format(LOCALHOST_TRANSIT_URI, port); + assertEquals(expectedTransitUri, record.getTransitUri()); + } + + private void assertFlowFilesSuccess(final int expectedFlowFiles) { + runner.run(expectedFlowFiles, true, false); + runner.assertTransferCount(ListenBeats.REL_SUCCESS, expectedFlowFiles); + + final Iterator flowFiles = runner.getFlowFilesForRelationship(ListenBeats.REL_SUCCESS).iterator(); + int i = 1; + while (flowFiles.hasNext()) { + final MockFlowFile flowFile = flowFiles.next(); + final String content = flowFile.getContent(); + assertEquals(JSON_PAYLOAD, content, String.format("FlowFile Content [%d] not matched", i)); + i++; + } + } + + private void sendWindowSize(final OutputStream outputStream) throws IOException { + outputStream.write(ProtocolVersion.VERSION_2.getCode()); + + outputStream.write(FrameType.WINDOW_SIZE.getCode()); + + final byte[] windowSize = getUnsignedInteger(WINDOWED_MESSAGES); + outputStream.write(windowSize); + + outputStream.flush(); + } + + private void sendMessage(final OutputStream outputStream, final int sequenceNumber) throws IOException { + outputStream.write(ProtocolVersion.VERSION_2.getCode()); + + outputStream.write(FrameType.JSON.getCode()); + + final byte[] sequenceNumberEncoded = getUnsignedInteger(sequenceNumber); + outputStream.write(sequenceNumberEncoded); + + final int payloadLength = JSON_PAYLOAD.length(); + + final byte[] payloadSize = getUnsignedInteger(payloadLength); + outputStream.write(payloadSize); + + outputStream.write(JSON_PAYLOAD.getBytes(StandardCharsets.UTF_8)); + + outputStream.flush(); + } + + private void sendCompressed(final OutputStream outputStream, final byte[] compressed) throws IOException { + outputStream.write(ProtocolVersion.VERSION_2.getCode()); + + outputStream.write(FrameType.COMPRESSED.getCode()); + + final int payloadLength = compressed.length; + + final byte[] payloadSize = getUnsignedInteger(payloadLength); + outputStream.write(payloadSize); + + outputStream.write(compressed); + + outputStream.flush(); + } + + private void assertAckPacketMatched(final InputStream inputStream, final int expectedSequenceNumber) throws IOException { + final byte[] ackPacket = new byte[ACK_PACKET_LENGTH]; + final int bytesRead = inputStream.read(ackPacket); + + assertEquals(ACK_PACKET_LENGTH, bytesRead); + + final ByteBuffer ackPacketBuffer = ByteBuffer.wrap(ackPacket); + + final byte version = ackPacketBuffer.get(); + assertEquals(ProtocolVersion.VERSION_2.getCode(), version); + + final byte frameType = ackPacketBuffer.get(); + assertEquals(FrameType.ACK.getCode(), frameType); + + final int sequenceNumber = ackPacketBuffer.getInt(); + assertEquals(expectedSequenceNumber, sequenceNumber); + } + + private byte[] getUnsignedInteger(final int number) { + return ByteBuffer.allocate(INTEGER_BUFFER_SIZE).putInt(number).array(); + } +} diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java deleted file mode 100644 index b1a8b0da57..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.frame; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import javax.xml.bind.DatatypeConverter; -import java.nio.ByteBuffer; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; - - -public class TestBeatsEncoder { - private BeatsEncoder encoder; - - - @BeforeEach - public void setup() { - this.encoder = new BeatsEncoder(); - } - - @Test - public void testEncode() { - BeatsFrame frame = new BeatsFrame.Builder() - .version((byte) 0x31) - .frameType((byte) 0x41) - .payload(ByteBuffer.allocate(4).putInt(123).array()) - .build(); - - byte[] encoded = encoder.encode(frame); - - assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"), encoded); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java deleted file mode 100644 index 1d5b28ac51..0000000000 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.beats.frame; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class TestBeatsFrame { - - @Test - public void testInvalidVersion() { - assertThrows(BeatsFrameException.class, () -> new BeatsFrame.Builder().seqNumber(1234).dataSize(3).build()); - } - - @Test - public void testInvalidFrameType() { - assertThrows(BeatsFrameException.class, () -> new BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build()); - } - - @Test - public void testBlankFrameType() { - assertThrows(BeatsFrameException.class, () -> new BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build()); - } -} \ No newline at end of file