NIFI-10737 Corrected ListenBeats buffer handling

- Added test class for ListenBeats
- Removed unnecessary dependencies
- Implemented BatchDecoder for reading Beats Protocol frames
- Refactored protocol and handler classes

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6608.
This commit is contained in:
exceptionfactory 2022-10-31 22:30:32 -05:00 committed by Nathan Gough
parent e60cbd4bbb
commit 7c1a7da116
26 changed files with 1089 additions and 1033 deletions

View File

@ -34,31 +34,18 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-listen</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-socket-ssl</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-socket-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-socket-ssl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
@ -71,25 +58,4 @@
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- This profile, activating when compiling on Java versions above 1.8, provides configuration changes to
allow NiFi to be compiled on those JDKs. -->
<id>jigsaw</id>
<activation>
<jdk>(1.8,)</jdk>
</activation>
<dependencies>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -16,20 +16,18 @@
*/
package org.apache.nifi.processors.beats;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -38,14 +36,13 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.beats.netty.BeatsMessage;
import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
@ -55,11 +52,9 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -67,34 +62,30 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"})
@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " +
"to the content of a FlowFile." +
"This processor replaces the now deprecated/removed ListenLumberjack")
@Tags({"beats", "logstash", "elasticsearch", "log"})
@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats protocol and write decoded JSON")
@WritesAttributes({
@WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."),
@WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."),
@WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message. Only included if <Batch Size> is 1."),
@WritesAttribute(attribute = "beats.sender", description = "Internet Protocol address of the message sender"),
@WritesAttribute(attribute = "beats.port", description = "TCP port on which the Processor received messages"),
@WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message included for batches containing single messages"),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")
})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenBeats extends AbstractProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection.")
// Nearly all Lumberjack v1 implementations require TLS to work. v2 implementations (i.e. beats) have TLS as optional
.description("SSL Context Service is required to enable TLS for socket connections")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.displayName("Client Auth")
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
.displayName("Client Authentication")
.description("Client authentication policy when TLS is enabled")
.required(false)
.dependsOn(SSL_CONTEXT_SERVICE)
.allowableValues(ClientAuth.values())
.defaultValue(ClientAuth.REQUIRED.name())
.build();
@ -104,73 +95,43 @@ public class ListenBeats extends AbstractProcessor {
.description("Messages received successfully will be sent out this relationship.")
.build();
protected List<PropertyDescriptor> descriptors;
protected Set<Relationship> relationships;
private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
ListenerProperties.NETWORK_INTF_NAME,
ListenerProperties.PORT,
ListenerProperties.RECV_BUFFER_SIZE,
ListenerProperties.MAX_MESSAGE_QUEUE_SIZE,
ListenerProperties.MAX_SOCKET_BUFFER_SIZE,
ListenerProperties.CHARSET,
ListenerProperties.MAX_BATCH_SIZE,
ListenerProperties.MESSAGE_DELIMITER,
ListenerProperties.WORKER_THREADS,
SSL_CONTEXT_SERVICE,
CLIENT_AUTH
));
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
protected volatile int port;
protected volatile BlockingQueue<BeatsMessage> events;
protected volatile BlockingQueue<BeatsMessage> errorEvents;
protected volatile BlockingQueue<BatchMessage> events;
protected volatile BlockingQueue<BatchMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
protected volatile EventBatcher<BeatsMessage> eventBatcher;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
descriptors.add(ListenerProperties.PORT);
descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
// Deprecated
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && !sslContextService.isTrustStoreConfigured()) {
results.add(new ValidationResult.Builder()
.explanation("SSL Context Service requires a truststore for the Beats forwarder client to work correctly")
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
}
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
.explanation("Client Auth must be provided when using TLS/SSL")
.valid(false).subject("Client Auth").build());
}
return results;
}
protected volatile EventBatcher<BatchMessage> eventBatcher;
@Override
public final Set<Relationship> getRelationships() {
return this.relationships;
return RELATIONSHIPS;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
return DESCRIPTORS;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
final int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@ -180,7 +141,7 @@ public class ListenBeats extends AbstractProcessor {
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, charset, events);
final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, events);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
@ -191,80 +152,75 @@ public class ListenBeats extends AbstractProcessor {
eventFactory.setClientAuth(clientAuth);
}
eventFactory.setSocketReceiveBuffer(bufferSize);
eventFactory.setSocketReceiveBuffer(socketBufferSize);
eventFactory.setWorkerThreads(workerThreads);
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
try {
eventServer = eventFactory.getEventServer();
} catch (EventException e) {
} catch (final EventException e) {
getLogger().error("Failed to bind to [{}:{}]", address, port, e);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
EventBatcher<BeatsMessage> eventBatcher = getEventBatcher();
EventBatcher<BatchMessage> eventBatcher = getEventBatcher();
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
Map<String, FlowFileEventBatch<BeatsMessage>> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
Map<String, FlowFileEventBatch<BatchMessage>> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
}
@OnStopped
public void stopped() {
if (eventServer != null) {
public void shutdown() {
if (eventServer == null) {
getLogger().warn("Event Server not configured");
} else {
eventServer.shutdown();
}
eventBatcher = null;
}
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<BeatsMessage>> batches) {
for (Map.Entry<String, FlowFileEventBatch<BeatsMessage>> entry : batches.entrySet()) {
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<BatchMessage>> batches) {
for (final Map.Entry<String, FlowFileEventBatch<BatchMessage>> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<BeatsMessage> events = entry.getValue().getEvents();
final List<BatchMessage> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
continue;
}
final Map<String,String> attributes = getAttributes(entry.getValue());
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
// the sender and command will be the same for all events based on the batch key
final String transitUri = getTransitUri(entry.getValue());
session.getProvenanceReporter().receive(flowFile, transitUri);
}
session.commitAsync();
}
protected String getTransitUri(FlowFileEventBatch<BeatsMessage> batch) {
final List<BeatsMessage> events = batch.getEvents();
private String getTransitUri(final FlowFileEventBatch<BatchMessage> batch) {
final List<BatchMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
return String.format("beats://%s:%d", senderHost, port);
return String.format("beats://%s:%d", sender, port);
}
protected Map<String, String> getAttributes(FlowFileEventBatch<BeatsMessage> batch) {
final List<BeatsMessage> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key
private Map<String, String> getAttributes(final FlowFileEventBatch<BatchMessage> batch) {
final List<BatchMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final int numAttributes = events.size() == 1 ? 5 : 4;
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(beatsAttributes.SENDER.key(), sender);
attributes.put(beatsAttributes.PORT.key(), String.valueOf(port));
final Map<String, String> attributes = new LinkedHashMap<>();
attributes.put(BeatsAttributes.SENDER.key(), sender);
attributes.put(BeatsAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
// if there was only one event then we can pass on the transaction
// NOTE: we could pass on all the transaction ids joined together
if (events.size() == 1) {
attributes.put(beatsAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber()));
attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(), String.valueOf(events.get(0).getSequenceNumber()));
}
return attributes;
}
@ -275,11 +231,11 @@ public class ListenBeats extends AbstractProcessor {
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
private EventBatcher<BeatsMessage> getEventBatcher() {
private EventBatcher<BatchMessage> getEventBatcher() {
if (eventBatcher == null) {
eventBatcher = new EventBatcher<BeatsMessage>(getLogger(), events, errorEvents) {
eventBatcher = new EventBatcher<BatchMessage>(getLogger(), events, errorEvents) {
@Override
protected String getBatchKey(BeatsMessage event) {
protected String getBatchKey(final BatchMessage event) {
return event.getSender();
}
};
@ -287,14 +243,14 @@ public class ListenBeats extends AbstractProcessor {
return eventBatcher;
}
public enum beatsAttributes implements FlowFileAttributeKey {
private enum BeatsAttributes implements FlowFileAttributeKey {
SENDER("beats.sender"),
PORT("beats.port"),
SEQNUMBER("beats.sequencenumber");
SEQUENCE_NUMBER("beats.sequencenumber");
private final String key;
beatsAttributes(String key) {
BeatsAttributes(String key) {
this.key = key;
}

View File

@ -1,328 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import org.apache.nifi.logging.ComponentLog;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.InflaterInputStream;
/**
* Decodes a Beats frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
*/
public class BeatsDecoder {
final ComponentLog logger;
private BeatsFrame.Builder frameBuilder;
private BeatsState currState = BeatsState.VERSION;
private byte decodedFrameType;
private byte[] unprocessedData;
private final Charset charset;
private final ByteArrayOutputStream currBytes;
private long windowSize;
static final int MIN_FRAME_HEADER_LENGTH = 2; // Version + Type
static final int WINDOWSIZE_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32bit unsigned window size
static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32 bit unsigned + payload
static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit unsigned sequence number + 32 bit unsigned payload length
/**
* @param charset the charset to decode bytes from the frame
*/
public BeatsDecoder(final Charset charset, final ComponentLog logger) {
this(charset, new ByteArrayOutputStream(4096), logger);
}
/**
* @param charset the charset to decode bytes from the frame
* @param buffer a buffer to use while processing the bytes
*/
public BeatsDecoder(final Charset charset, final ByteArrayOutputStream buffer, final ComponentLog logger) {
this.logger = logger;
this.charset = charset;
this.currBytes = buffer;
this.frameBuilder = new BeatsFrame.Builder();
this.decodedFrameType = 0x00;
}
/**
* Resets this decoder back to its initial state.
*/
public void reset() {
frameBuilder = new BeatsFrame.Builder();
currState = BeatsState.VERSION;
decodedFrameType = 0x00;
currBytes.reset();
}
/**
* Process the next byte from the channel, updating the builder and state accordingly.
*
* @param currByte the next byte to process
* @preturn true if a frame is ready to be retrieved, false otherwise
*/
public boolean process(final byte currByte) throws BeatsFrameException {
try {
switch (currState) {
case VERSION: // Just enough data to process the version
processVERSION(currByte);
break;
case FRAMETYPE: // Also able to process the frametype
processFRAMETYPE(currByte);
break;
case PAYLOAD: // Initial bytes with version and Frame Type have already been received, start iteration over payload
processPAYLOAD(currByte);
// At one stage, the data sent to processPAYLOAD will be represente a complete frame, so we check before returning true
if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE && currState == BeatsState.COMPLETE) {
return true;
} else if (frameBuilder.frameType == BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) {
return true;
} else if (frameBuilder.frameType == BeatsFrameType.JSON && currState == BeatsState.COMPLETE) {
return true;
} else {
break;
}
case COMPLETE:
return true;
default:
break;
}
return false;
} catch (Exception e) {
throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
}
}
/**
* Returns the decoded frame and resets the decoder for the next frame.
* This method should be called after checking isComplete().
*
* @return the BeatsFrame that was decoded
*/
public List<BeatsFrame> getFrames() throws BeatsFrameException {
List<BeatsFrame> frames = new LinkedList<>();
if (currState != BeatsState.COMPLETE) {
throw new BeatsFrameException("Must be at the trailer of a frame");
}
try {
// Once compressed frames are expanded, they must be devided into individual frames
if (currState == BeatsState.COMPLETE && frameBuilder.frameType == BeatsFrameType.COMPRESSED) {
logger.debug("Frame is compressed, will iterate to decode", new Object[]{});
// Zero currBytes, currState and frameBuilder prior to iteration over
// decompressed bytes
currBytes.reset();
frameBuilder.reset();
currState = BeatsState.VERSION;
// Run over decompressed data and split frames
frames = splitCompressedFrames(unprocessedData);
// In case of V or wired D and J frames we just ship them across the List
} else {
final BeatsFrame frame = frameBuilder.build();
currBytes.reset();
frameBuilder.reset();
currState = BeatsState.VERSION;
frames.add(frame);
}
return frames;
} catch (Exception e) {
throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
}
}
private List<BeatsFrame> splitCompressedFrames(byte[] decompressedData) {
List<BeatsFrame> frames = new LinkedList<>();
BeatsFrame.Builder internalFrameBuilder = new BeatsFrame.Builder();
ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
// Both Lumberjack v1 and Beats (LJ v2) has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames.
// inside a compressed input.
// Or as stated in the documentation:
//
// "As an example, you could have 3 data frames compressed into a single
// 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
//
// Therefore, instead of calling process method again, just iterate over each of
// the frames and split them so they can be processed
while (currentData.hasRemaining()) {
int payloadLength = 0;
internalFrameBuilder.version = currentData.get();
internalFrameBuilder.frameType = currentData.get();
switch (internalFrameBuilder.frameType) {
case BeatsFrameType.JSON:
internalFrameBuilder.seqNumber = (int) (currentData.getInt() & 0x00000000ffffffffL);
currentData.mark();
internalFrameBuilder.dataSize = currentData.getInt() & 0x00000000ffffffffL;
currentData.mark();
// Define how much data to chomp
payloadLength = Math.toIntExact(internalFrameBuilder.dataSize);
byte[] jsonBytes = new byte[payloadLength];
currentData.get(jsonBytes, 0, payloadLength);
currentData.mark();
// Add payload to frame
internalFrameBuilder.payload(jsonBytes);
break;
}
// data frame is created
BeatsFrame frame = internalFrameBuilder.build();
frames.add(frame);
internalFrameBuilder.reset();
}
return frames;
}
private void processVERSION(final byte b) {
byte version = b;
frameBuilder.version(version);
logger.debug("Version number is {}", new Object[]{version});
currBytes.write(b);
currState = BeatsState.FRAMETYPE;
}
private void processFRAMETYPE(final byte b) {
decodedFrameType = b;
frameBuilder.frameType(decodedFrameType);
logger.debug("Frame type is {}", new Object[]{decodedFrameType});
currBytes.write(b);
currState = BeatsState.PAYLOAD;
}
/** Process the outer PAYLOAD byte by byte. Once data is read state is set to COMPLETE so that the data payload
* can be processed fully using {@link #splitCompressedFrames(byte[])}
* */
private void processPAYLOAD(final byte b) {
currBytes.write(b);
switch (decodedFrameType) {
case BeatsFrameType.WINDOWSIZE: //'W'
if (currBytes.size() < WINDOWSIZE_LENGTH ) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
break;
} else if (currBytes.size() == WINDOWSIZE_LENGTH) {
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize});
// Sets payload to empty as frame contains no data
frameBuilder.payload(new byte[]{});
currBytes.reset();
currState = BeatsState.COMPLETE;
windowSize = frameBuilder.dataSize;
break;
} else { // Should never be here to be honest...
logger.debug("Saw a packet I should not have seen. Packet contents were {}", new Object[] {currBytes.toString()});
break;
}
case BeatsFrameType.COMPRESSED: //'C'
if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
if (logger.isTraceEnabled()) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
}
break;
} else if (currBytes.size() >= COMPRESSED_MIN_LENGTH) {
// If data contains more thant the minimum data size
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
if (currBytes.size() - 6 == frameBuilder.dataSize) {
try {
byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf));
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
in.close();
out.close();
unprocessedData = out.toByteArray();
// buf is no longer needed
buf = null;
logger.debug("Finished decompressing data");
// Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them
// as type 'D' frames
frameBuilder.dataSize(unprocessedData.length);
currState = BeatsState.COMPLETE;
} catch (IOException e) {
throw new BeatsFrameException("Error decompressing frame: " + e.getMessage(), e);
}
}
break;
// If currentByte.size is not lower than six and also not equal or great than 6...
} else { // Should never be here to be honest...
if (logger.isDebugEnabled()) {
logger.debug("Received a compressed frame with partial data or invalid content. The packet contents were {}", new Object[] {currBytes.toString()});
}
break;
}
case BeatsFrameType.JSON: // ''
// Because Beats can disable compression, sometimes, JSON data will be received outside a compressed
// stream (i.e. 0x43). Instead of processing it here, we defer its processing to went getFrames is
// called
if (currBytes.size() < JSON_MIN_LENGTH) {
if (logger.isTraceEnabled()) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
}
break;
} else if (currBytes.size() == JSON_MIN_LENGTH) {
// Read the sequence number from bytes
frameBuilder.seqNumber = (int) (ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL);
// Read the JSON payload length
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, 10)).getInt() & 0x00000000ffffffffL;
} else if (currBytes.size() > JSON_MIN_LENGTH) {
// Wait for payload to be fully read and then complete processing
if (currBytes.size() - 10 == frameBuilder.dataSize) {
// Transfer the current payload so it can be processed by {@link #splitCompressedFrames} method.
frameBuilder.payload = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 10, currBytes.size());
currState = BeatsState.COMPLETE;
}
break;
}
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Encodes a BeatsFrame into raw bytes using the given charset.
*/
public class BeatsEncoder {
public byte[] encode(final BeatsFrame frame) {
final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
// Writes the version
buffer.write(frame.getVersion());
// Writes the frameType
buffer.write(frame.getFrameType());
// Writes the sequence number
try {
buffer.write(frame.getPayload());
} catch (IOException e) {
throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
}
return buffer.toByteArray();
}
}

View File

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
/**
* A frame received from a channel.
*/
public class BeatsFrame {
public static final byte DELIMITER = 10;
private final byte version;
private final byte frameType;
private final byte[] payload;
private final long dataSize;
private final long seqNumber;
private BeatsFrame(final Builder builder) {
this.version = builder.version;
this.frameType = builder.frameType;
this.payload = builder.payload;
this.dataSize = builder.dataSize;
this.seqNumber = builder.seqNumber;
if (version < 2 || payload.length < 0 ) {
throw new BeatsFrameException("Invalid Frame");
}
}
public long getSeqNumber() {
return seqNumber;
}
public byte getVersion() {
return version;
}
public byte getFrameType() {
return frameType;
}
public byte [] getPayload() {
return payload;
}
/**
* Builder for a BeatsFrame.
*/
public static class Builder {
byte version;
byte frameType;
byte [] payload;
long dataSize;
int seqNumber;
public Builder() {
reset();
}
public void reset() {
version = -1;
seqNumber = -1;
frameType = -1;
payload = null;
}
public Builder version(final byte version) {
this.version = version;
return this;
}
public Builder seqNumber(final int seqNumber) {
this.seqNumber = seqNumber;
return this;
}
public Builder frameType(final byte frameType) {
this.frameType = frameType;
return this;
}
public Builder dataSize(final long dataSize) {
this.dataSize = dataSize;
return this;
}
public Builder payload(final byte [] payload) {
this.payload = payload;
return this;
}
public BeatsFrame build() {
return new BeatsFrame(this);
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.Batch;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.protocol.MessageAck;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
/**
* Batch Channel Inbound Handler processes a batch of messages and sends an acknowledgement for the last sequence number
*/
@ChannelHandler.Sharable
public class BatchChannelInboundHandler extends SimpleChannelInboundHandler<Batch> {
private final ComponentLog log;
private final BlockingQueue<BatchMessage> messages;
/**
* Batch Channel Inbound Handler with required arguments
*
* @param log Processor Log
* @param messages Queue of messages
*/
public BatchChannelInboundHandler(final ComponentLog log, final BlockingQueue<BatchMessage> messages) {
this.log = Objects.requireNonNull(log, "Component Log required");
this.messages = Objects.requireNonNull(messages, "Message Queue required");
}
/**
* Channel Read processes a batch of messages and sends an acknowledgement for the last sequence number
*
* @param context Channel Handler Context
* @param batch Batch of messages
*/
@Override
protected void channelRead0(final ChannelHandlerContext context, final Batch batch) {
Integer lastSequenceNumber = null;
final Collection<BatchMessage> batchMessages = batch.getMessages();
int queued = 0;
for (final BatchMessage batchMessage : batchMessages) {
final int sequenceNumber = batchMessage.getSequenceNumber();
final String sender = batchMessage.getSender();
if (messages.offer(batchMessage)) {
log.debug("Message Sequence Number [{}] Sender [{}] queued", sequenceNumber, sender);
lastSequenceNumber = batchMessage.getSequenceNumber();
queued++;
} else {
log.warn("Message Sequence Number [{}] Sender [{}] queuing failed: Queued [{}] of [{}]", sequenceNumber, sender, queued, batchMessages.size());
break;
}
}
if (lastSequenceNumber == null) {
log.warn("Batch Messages [{}] queuing failed", batch.getMessages().size());
} else {
final MessageAck messageAck = new MessageAck(lastSequenceNumber);
context.writeAndFlush(messageAck);
}
}
}

View File

@ -0,0 +1,380 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.Batch;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.protocol.FrameType;
import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
import org.apache.nifi.processors.beats.protocol.ProtocolException;
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Inflater;
import java.util.zip.InflaterOutputStream;
/**
* Byte Buffer to Batch Decoder parses bytes to batches of Beats messages
*/
public class BatchDecoder extends ByteToMessageDecoder {
private static final int INITIAL_WINDOW_SIZE = 1;
private static final int INITIAL_QUEUE_SIZE = 1;
private static final int CODE_READABLE_BYTES = 1;
private static final int INT_READABLE_BYTES = 4;
private static final ProtocolCodeDecoder<ProtocolVersion> VERSION_DECODER = new ProtocolVersionDecoder();
private static final ProtocolCodeDecoder<FrameType> FRAME_TYPE_DECODER = new FrameTypeDecoder();
private final ComponentLog log;
private final AtomicReference<ProtocolVersion> versionRef = new AtomicReference<>();
private final AtomicReference<FrameType> frameTypeRef = new AtomicReference<>();
private final AtomicInteger windowSize = new AtomicInteger(INITIAL_WINDOW_SIZE);
private final AtomicReference<Integer> sequenceNumberRef = new AtomicReference<>();
private final AtomicReference<Integer> payloadSizeRef = new AtomicReference<>();
private final AtomicReference<Integer> compressedSizeRef = new AtomicReference<>();
private Queue<BatchMessage> batchMessages = new ArrayBlockingQueue<>(INITIAL_QUEUE_SIZE);
/**
* Beats Batch Decoder with required arguments
*
* @param log Processor Log
*/
public BatchDecoder(final ComponentLog log) {
this.log = Objects.requireNonNull(log, "Component Log required");
}
/**
* Decode Batch of Beats Messages from Byte Buffer
*
* @param context Channel Handler Context
* @param buffer Byte Buffer
* @param objects List of Batch objects
*/
@Override
protected void decode(final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> objects) {
final ProtocolVersion protocolVersion = readVersion(buffer);
if (ProtocolVersion.VERSION_2 == protocolVersion) {
final FrameType frameType = readFrameType(buffer);
decodeFrameType(frameType, context, buffer, objects);
} else if (ProtocolVersion.VERSION_1 == protocolVersion) {
throw new ProtocolException("Protocol Version [1] not supported");
}
}
private void decodeFrameType(final FrameType frameType, final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> batches) {
if (frameType == null) {
log.trace("Frame Type not found");
} else if (FrameType.COMPRESSED == frameType) {
processCompressed(context, buffer, batches);
} else if (FrameType.WINDOW_SIZE == frameType) {
processWindowSize(context, buffer);
} else if (FrameType.JSON == frameType) {
processJson(context, buffer, batches);
} else {
final String message = String.format("Frame Type [%s] not supported", frameType);
throw new ProtocolException(message);
}
}
private void processWindowSize(final ChannelHandlerContext context, final ByteBuf buffer) {
final Integer readWindowSize = readUnsignedInteger(buffer);
if (readWindowSize == null) {
log.trace("State [Read Window Size] not enough readable bytes");
} else {
windowSize.getAndSet(readWindowSize);
batchMessages = new ArrayBlockingQueue<>(readWindowSize);
resetFrameTypeVersion();
final Channel channel = context.channel();
log.debug("Processed Window Size [{}] Local [{}] Remote [{}]", readWindowSize, channel.localAddress(), channel.remoteAddress());
}
}
private void processCompressed(final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> batches) {
final Integer readCompressedSize = readCompressedSize(buffer);
if (readCompressedSize == null) {
log.trace("State [Read Compressed] not enough readable bytes");
} else {
final int readableBytes = buffer.readableBytes();
if (readableBytes >= readCompressedSize) {
final Channel channel = context.channel();
log.debug("Processing Compressed Size [{}] Local [{}] Remote [{}]", readCompressedSize, channel.localAddress(), channel.remoteAddress());
processCompressed(context, buffer, readCompressedSize, batches);
} else {
log.trace("State [Read Compressed] not enough readable bytes [{}] for compressed [{}]", readableBytes, readCompressedSize);
}
}
}
private void processCompressed(
final ChannelHandlerContext context,
final ByteBuf buffer,
final int compressedSize,
final List<Object> batches
) {
final ByteBuf inflated = context.alloc().buffer(compressedSize);
try {
readCompressedBuffer(buffer, inflated, compressedSize);
// Clear status prior to decoding inflated frames
resetSequenceVersionPayloadSize();
resetFrameTypeVersion();
while (inflated.isReadable()) {
decode(context, inflated, batches);
}
} finally {
compressedSizeRef.set(null);
inflated.release();
}
}
private void processJson(final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> batches) {
final Channel channel = context.channel();
final Integer sequenceNumber = readSequenceNumber(buffer);
if (sequenceNumber == null) {
log.trace("State [Read JSON] Sequence Number not found Remote [{}]", channel.remoteAddress());
} else {
final Integer payloadSize = readPayloadSize(buffer);
if (payloadSize == null) {
log.trace("State [Read JSON] Payload Size not found Remote [{}]", channel.remoteAddress());
} else {
processJson(sequenceNumber, payloadSize, context, buffer, batches);
}
}
}
private void processJson(
final int sequenceNumber,
final int payloadSize,
final ChannelHandlerContext context,
final ByteBuf buffer,
final List<Object> batches
) {
final Channel channel = context.channel();
final BatchMessage batchMessage = readJsonMessage(context, sequenceNumber, payloadSize, buffer);
if (batchMessage == null) {
log.trace("State [Read JSON] Message not found Remote [{}]", channel.remoteAddress());
} else {
processBatchMessage(batchMessage, batches);
log.debug("Processed JSON Message Sequence Number [{}] Payload Size [{}] Local [{}] Remote [{}]", sequenceNumber, payloadSize, channel.localAddress(), channel.remoteAddress());
}
}
private BatchMessage readJsonMessage(
final ChannelHandlerContext context,
final int sequenceNumber,
final int payloadSize,
final ByteBuf buffer
) {
final BatchMessage batchMessage;
final int readableBytes = buffer.readableBytes();
if (readableBytes >= payloadSize) {
final byte[] payload = new byte[payloadSize];
buffer.readBytes(payload);
final Channel channel = context.channel();
final String sender = getRemoteHostAddress(channel);
batchMessage = new BatchMessage(sender, payload, sequenceNumber);
} else {
batchMessage = null;
log.trace("State [Read JSON] Sequence Number [{}] not enough readable bytes [{}] for payload [{}]", sequenceNumber, readableBytes, payloadSize);
}
return batchMessage;
}
private String getRemoteHostAddress(final Channel channel) {
final String remoteHostAddress;
final SocketAddress remoteAddress = channel.remoteAddress();
if (remoteAddress instanceof InetSocketAddress) {
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) remoteAddress;
final InetAddress address = remoteSocketAddress.getAddress();
remoteHostAddress = address.getHostAddress();
} else {
remoteHostAddress = remoteAddress.toString();
}
return remoteHostAddress;
}
private void processBatchMessage(final BatchMessage batchMessage, final List<Object> batches) {
if (batchMessages.offer(batchMessage)) {
resetSequenceVersionPayloadSize();
resetFrameTypeVersion();
if (windowSize.get() == batchMessages.size()) {
final Collection<BatchMessage> messages = new ArrayList<>(batchMessages);
final Batch batch = new Batch(messages);
batches.add(batch);
resetWindowSize();
}
} else {
final String message = String.format("Received message exceeds Window Size [%d]", windowSize.get());
throw new ProtocolException(message);
}
}
private void readCompressedBuffer(final ByteBuf compressedBuffer, final ByteBuf inflated, final int compressedSize) {
final Inflater inflater = new Inflater();
try (
final ByteBufOutputStream outputStream = new ByteBufOutputStream(inflated);
final InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(outputStream, inflater)
) {
compressedBuffer.readBytes(inflaterOutputStream, compressedSize);
} catch (final IOException e) {
final String message = String.format("Read Compressed Payload Size [%d] failed", compressedSize);
throw new ProtocolException(message, e);
} finally {
inflater.end();
}
}
private Integer readSequenceNumber(final ByteBuf buffer) {
if (sequenceNumberRef.get() == null) {
final Integer readSequenceNumber = readUnsignedInteger(buffer);
if (readSequenceNumber == null) {
log.trace("State [Read JSON] not enough readable bytes for Sequence Number");
} else {
sequenceNumberRef.set(readSequenceNumber);
}
}
return sequenceNumberRef.get();
}
private Integer readPayloadSize(final ByteBuf buffer) {
if (payloadSizeRef.get() == null) {
final Integer readPayloadSize = readUnsignedInteger(buffer);
if (readPayloadSize == null) {
log.trace("State [Read JSON] not enough readable bytes for Payload Size");
} else {
payloadSizeRef.set(readPayloadSize);
}
}
return payloadSizeRef.get();
}
private Integer readCompressedSize(final ByteBuf buffer) {
if (compressedSizeRef.get() == null) {
final Integer readCompressedSize = readUnsignedInteger(buffer);
if (readCompressedSize == null) {
log.trace("State [Read Compressed] not enough readable bytes for Compressed Size");
} else {
compressedSizeRef.set(readCompressedSize);
}
}
return compressedSizeRef.get();
}
private Integer readUnsignedInteger(final ByteBuf buffer) {
final Integer number;
final int readableBytes = buffer.readableBytes();
if (readableBytes >= INT_READABLE_BYTES) {
final long unsigned = buffer.readUnsignedInt();
number = Math.toIntExact(unsigned);
} else {
number = null;
}
return number;
}
private FrameType readFrameType(final ByteBuf buffer) {
if (frameTypeRef.get() == null) {
final int readableBytes = buffer.readableBytes();
if (readableBytes >= CODE_READABLE_BYTES) {
final byte frameTypeCode = buffer.readByte();
final FrameType frameType = FRAME_TYPE_DECODER.readProtocolCode(frameTypeCode);
frameTypeRef.set(frameType);
} else {
log.trace("State [Read Frame Type] not enough readable bytes [{}]", readableBytes);
}
}
return frameTypeRef.get();
}
private ProtocolVersion readVersion(final ByteBuf buffer) {
if (versionRef.get() == null) {
final int readableBytes = buffer.readableBytes();
if (readableBytes >= CODE_READABLE_BYTES) {
final byte versionCode = buffer.readByte();
final ProtocolVersion protocolVersion = VERSION_DECODER.readProtocolCode(versionCode);
versionRef.set(protocolVersion);
} else {
log.trace("State [Read Version] not enough readable bytes [{}]", readableBytes);
}
}
return versionRef.get();
}
private void resetSequenceVersionPayloadSize() {
sequenceNumberRef.set(null);
payloadSizeRef.set(null);
}
private void resetFrameTypeVersion() {
frameTypeRef.set(null);
versionRef.set(null);
}
private void resetWindowSize() {
windowSize.set(INITIAL_WINDOW_SIZE);
batchMessages.clear();
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.FrameType;
import org.apache.nifi.processors.beats.protocol.MessageAck;
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
import java.util.Objects;
/**
* Beats Message Acknowledgement Encoder writes Protocol Version 2 ACK packets with a specified sequence number
*/
@ChannelHandler.Sharable
public class MessageAckEncoder extends MessageToByteEncoder<MessageAck> {
private final ComponentLog log;
/**
* Message Acknowledgment Encoder with required arguments
*
* @param log Processor Log
*/
public MessageAckEncoder(final ComponentLog log) {
this.log = Objects.requireNonNull(log, "Component Log required");
}
/**
* Encode Message Acknowledgement to the buffer with Protocol Version 2 and ACK Frame Type
*
* @param context Channel Handler Context
* @param messageAck Message Acknowledgement containing Sequence Number
* @param buffer Byte Buffer
*/
@Override
protected void encode(final ChannelHandlerContext context, final MessageAck messageAck, final ByteBuf buffer) {
buffer.writeByte(ProtocolVersion.VERSION_2.getCode());
buffer.writeByte(FrameType.ACK.getCode());
final int sequenceNumber = messageAck.getSequenceNumber();
buffer.writeInt(sequenceNumber);
final Channel channel = context.channel();
log.debug("Encoded Message Ack Sequence Number [{}] Local [{}] Remote [{}]", sequenceNumber, channel.localAddress(), channel.remoteAddress());
}
}

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processors.beats.frame.BeatsDecoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsFrameType;
import org.apache.nifi.processors.beats.frame.BeatsMetadata;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
/**
* Decode a Beats message's bytes into a BeatsMessage object
*/
public class BeatsFrameDecoder extends ByteToMessageDecoder {
private final Charset charset;
private final ComponentLog logger;
private final BeatsMessageFactory messageFactory;
public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
this.charset = charset;
this.logger = logger;
this.messageFactory = new BeatsMessageFactory();
}
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
final int total = in.readableBytes();
final String senderSocket = ctx.channel().remoteAddress().toString();
final BeatsDecoder decoder = new BeatsDecoder(charset, logger);
for (int i = 0; i < total; i++) {
byte currByte = in.readByte();
// decode the bytes and once we find the end of a frame, handle the frame
if (decoder.process(currByte)) {
final List<BeatsFrame> frames = decoder.getFrames();
for (BeatsFrame frame : frames) {
logger.debug("Received Beats Frame Sender [{}] Transaction [{}] Frame Type [{}]",
senderSocket, frame.getSeqNumber(), frame.getFrameType());
// Ignore the WINDOW SIZE type frames as they contain no payload.
if (frame.getFrameType() != BeatsFrameType.WINDOWSIZE) {
handle(frame, senderSocket, out);
}
}
}
}
}
private void handle(final BeatsFrame frame, final String sender, final List<Object> out) {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
if (frame.getFrameType() == BeatsFrameType.JSON) {
final BeatsMessage event = messageFactory.create(frame.getPayload(), metadata);
out.add(event);
}
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
import org.apache.nifi.processors.beats.response.BeatsResponse;
import java.util.concurrent.BlockingQueue;
/**
* Decode data received into a BeatsMessage
*/
@ChannelHandler.Sharable
public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> {
private final ComponentLog componentLog;
private final BlockingQueue<BeatsMessage> events;
private final BeatsEncoder encoder;
public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events, ComponentLog componentLog) {
this.events = events;
this.componentLog = componentLog;
this.encoder = new BeatsEncoder();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
componentLog.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
if (events.offer(msg)) {
componentLog.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
BeatsChannelResponse successResponse = new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber()));
ctx.writeAndFlush(Unpooled.wrappedBuffer(successResponse.toByteArray()));
} else {
componentLog.warn("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.protocol;
import java.util.Collection;
import java.util.Objects;
/**
* Batch of Beats Messages
*/
public class Batch {
private final Collection<BatchMessage> messages;
public Batch(final Collection<BatchMessage> messages) {
this.messages = Objects.requireNonNull(messages, "Message required");
}
public Collection<BatchMessage> getMessages() {
return messages;
}
}

View File

@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
package org.apache.nifi.processors.beats.protocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
/**
* A Beats message which adds a sequence number to the ByteArrayMessage.
* Beats Batch Message containing JSON payload and sequence number
*/
public class BeatsMessage extends ByteArrayMessage {
public class BatchMessage extends ByteArrayMessage {
private final int seqNumber;
private final int sequenceNumber;
public BeatsMessage(final String sender, final byte[] data, final int seqNumber) {
super(data, sender);
this.seqNumber = seqNumber;
public BatchMessage(final String sender, final byte[] payload, final int sequenceNumber) {
super(payload, sender);
this.sequenceNumber = sequenceNumber;
}
public int getSeqNumber() {
return seqNumber;
public int getSequenceNumber() {
return sequenceNumber;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.protocol;
/**
* Beats Protocol Frame Type
*/
public enum FrameType implements ProtocolCode {
ACK('A'),
COMPRESSED('C'),
DATA('D'),
JSON('J'),
WINDOW_SIZE('W');
private final int code;
FrameType(final char code) {
this.code = code;
}
@Override
public int getCode() {
return code;
}
}

View File

@ -14,22 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
package org.apache.nifi.processors.beats.protocol;
import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
import org.apache.nifi.processors.beats.frame.BeatsMetadata;
import java.util.Map;
import java.util.Arrays;
import java.util.Optional;
/**
* An EventFactory implementation to create BeatsMessages.
* Beats Frame Type Decoder
*/
public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
public class FrameTypeDecoder implements ProtocolCodeDecoder<FrameType> {
@Override
public BeatsMessage create(final byte[] data, final Map<String, String> metadata) {
final int sequenceNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
return new BeatsMessage(sender, data, sequenceNumber);
public FrameType readProtocolCode(final byte code) {
final Optional<FrameType> frameTypeFound = Arrays.stream(FrameType.values()).filter(
frameType -> frameType.getCode() == code
).findFirst();
return frameTypeFound.orElseThrow(() -> {
final String message = String.format("Frame Type Code [%d] not supported", code);
return new ProtocolException(message);
});
}
}

View File

@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
package org.apache.nifi.processors.beats.protocol;
/**
* The stages of parsing of a Beats conversation.
* Beats Message Acknowledgement
*/
public enum BeatsState {
public class MessageAck {
private final int sequenceNumber;
VERSION, // First stage is parsing the version
FRAMETYPE, // Second stage is to be able to read the frame type
PAYLOAD, // payload being populated
COMPLETE // complete packet handling
}
public MessageAck(final int sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
public int getSequenceNumber() {
return sequenceNumber;
}
}

View File

@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
package org.apache.nifi.processors.beats.protocol;
/**
* Metadata keys for Beats message.
* Beats Protocol Code interface abstraction
*/
public interface BeatsMetadata {
String SEQNUMBER_KEY = "beats.sequencenumber";
String SENDER_KEY = "sender";
public interface ProtocolCode {
/**
* Get Protocol Code as transmitted over a socket connection
*
* @return Protocol Code
*/
int getCode();
}

View File

@ -14,12 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
package org.apache.nifi.processors.beats.protocol;
public final class BeatsFrameType {
public static final byte WINDOWSIZE = 0x57;
public static final byte DATA = 0x44;
public static final byte COMPRESSED = 0x43;
public static final byte ACK = 0x41;
public static final byte JSON = 0x4a;
/**
* Decoder for Protocol Code byte values
*
* @param <T> Protocol Code Type
*/
public interface ProtocolCodeDecoder<T extends ProtocolCode> {
/**
* Read Protocol Code
*
* @param code Code byte value
* @return Protocol Code
*/
T readProtocolCode(byte code);
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.protocol;
/**
* Beats Protocol Exception
*/
public class ProtocolException extends RuntimeException {
/**
* Protocol Exception constructor with message containing protocol failure details
*
* @param message Protocol failure details
*/
public ProtocolException(final String message) {
super(message);
}
/**
* Protocol Exception constructor with message and cause of failure details
*
* @param message Protocol failure details
* @param cause Cause of failure
*/
public ProtocolException(final String message, final Throwable cause) {
super(message, cause);
}
}

View File

@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
package org.apache.nifi.processors.beats.protocol;
/**
* Represents an error encountered when decoding frames.
* Beats Protocol Version
*/
public class BeatsFrameException extends RuntimeException {
public enum ProtocolVersion implements ProtocolCode {
VERSION_1('1'),
public BeatsFrameException(String message) {
super(message);
VERSION_2('2');
private final int code;
ProtocolVersion(final char code) {
this.code = code;
}
public BeatsFrameException(String message, Throwable cause) {
super(message, cause);
@Override
public int getCode() {
return code;
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.protocol;
import java.util.Arrays;
import java.util.Optional;
/**
* Beats Protocol Version Decoder
*/
public class ProtocolVersionDecoder implements ProtocolCodeDecoder<ProtocolVersion> {
@Override
public ProtocolVersion readProtocolCode(final byte code) {
final Optional<ProtocolVersion> protocolVersionFound = Arrays.stream(ProtocolVersion.values()).filter(
protocolVersion -> protocolVersion.getCode() == code
).findFirst();
return protocolVersionFound.orElseThrow(() -> {
final String message = String.format("Version Code [%d] not supported", code);
return new ProtocolException(message);
});
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.response;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
/**
* Creates a BeatsFrame for the provided response and returns the encoded frame.
*/
public class BeatsChannelResponse implements ChannelResponse {
private final BeatsEncoder encoder;
private final BeatsResponse response;
public BeatsChannelResponse(final BeatsEncoder encoder, final BeatsResponse response) {
this.encoder = encoder;
this.response = response;
}
@Override
public byte[] toByteArray() {
final BeatsFrame frame = response.toFrame();
return encoder.encode(frame);
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.response;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import java.nio.ByteBuffer;
/**
'ack' frame type
SENT FROM READER ONLY
frame type value: ASCII 'A' aka byte value 0x41
Payload:
32bit unsigned sequence number.
*/
public class BeatsResponse {
private final int seqNumber;
final private byte version = 0x32; // v2
final private byte frameType = 0x41; // A or ACK
public BeatsResponse(final int seqNumber) {
this.seqNumber = seqNumber;
}
/**
* Creates a BeatsFrame where the data portion will contain this response.
*
*
* @return a BeatsFrame for for this response
*/
public BeatsFrame toFrame() {
return new BeatsFrame.Builder()
.version(version)
.frameType(frameType)
.payload(ByteBuffer.allocate(4).putInt(seqNumber).array())
.build();
}
public static BeatsResponse ok(final int seqNumber) {
return new BeatsResponse(seqNumber);
}
}

View File

@ -14,43 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.netty;
package org.apache.nifi.processors.beats.server;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.handler.BatchChannelInboundHandler;
import org.apache.nifi.processors.beats.handler.BatchDecoder;
import org.apache.nifi.processors.beats.handler.MessageAckEncoder;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
/**
* Netty Event Server Factory implementation for RELP Messages
* Beats Message Protocol extends of Netty Event Server Factory
*/
public class BeatsMessageServerFactory extends NettyEventServerFactory {
/**
* RELP Message Server Factory to receive RELP messages
* Beats Message Server Factory constructor with standard configuration arguments
*
* @param log Component Log
* @param address Server Address
* @param port Server Port Number
* @param charset Charset to use when decoding RELP messages
* @param events Blocking Queue for events received
*/
public BeatsMessageServerFactory(final ComponentLog log,
final InetAddress address,
final int port,
final Charset charset,
final BlockingQueue<BeatsMessage> events) {
final BlockingQueue<BatchMessage> events) {
super(address, port, TransportProtocol.TCP);
final BeatsMessageChannelHandler beatsChannelHandler = new BeatsMessageChannelHandler(events, log);
final MessageAckEncoder messageAckEncoder = new MessageAckEncoder(log);
final BatchChannelInboundHandler batchChannelInboundHandler = new BatchChannelInboundHandler(log, events);
final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
setHandlerSupplier(() -> Arrays.asList(
new BeatsFrameDecoder(log, charset),
beatsChannelHandler,
messageAckEncoder,
new BatchDecoder(log),
batchChannelInboundHandler,
logExceptionChannelHandler
));
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.beats.protocol.FrameType;
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Optional;
import java.util.zip.DeflaterOutputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ListenBeatsTest {
private static final String LOCALHOST = "127.0.0.1";
private static final String LOCALHOST_TRANSIT_URI = "beats://127.0.0.1:%d";
private static final int ACK_PACKET_LENGTH = 6;
private static final int FIRST_SEQUENCE_NUMBER = 1;
private static final int INTEGER_BUFFER_SIZE = 4;
private static final String JSON_PAYLOAD = "{\"@timestamp\":\"2022-10-31T12:30:45.678Z\",\"message\":\"Processing Started\"}";
private static final int WINDOWED_MESSAGES = 50;
TestRunner runner;
@BeforeEach
void setRunner() {
runner = TestRunners.newTestRunner(ListenBeats.class);
}
@Timeout(10)
@Test
void testRunSingleJsonMessage() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
startServer();
try (
final Socket socket = new Socket(LOCALHOST, port);
final InputStream inputStream = socket.getInputStream();
final OutputStream outputStream = socket.getOutputStream()
) {
sendMessage(outputStream, FIRST_SEQUENCE_NUMBER);
assertAckPacketMatched(inputStream, FIRST_SEQUENCE_NUMBER);
}
assertFlowFilesSuccess(1);
assertReceiveEventFound(port);
}
@Timeout(10)
@Test
void testRunWindowSizeJsonMessages() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
startServer();
try (
final Socket socket = new Socket(LOCALHOST, port);
final InputStream inputStream = socket.getInputStream();
final OutputStream outputStream = socket.getOutputStream()
) {
sendWindowSize(outputStream);
for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= WINDOWED_MESSAGES; sequenceNumber++) {
sendMessage(outputStream, sequenceNumber);
}
assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
}
assertFlowFilesSuccess(WINDOWED_MESSAGES);
assertReceiveEventFound(port);
}
@Timeout(10)
@Test
void testRunWindowSizeCompressedJsonMessages() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
startServer();
try (
final Socket socket = new Socket(LOCALHOST, port);
final InputStream inputStream = socket.getInputStream();
final OutputStream outputStream = socket.getOutputStream()
) {
sendWindowSize(outputStream);
final ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream();
final DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(compressedOutputStream);
for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= WINDOWED_MESSAGES; sequenceNumber++) {
sendMessage(deflaterOutputStream, sequenceNumber);
}
deflaterOutputStream.close();
final byte[] compressed = compressedOutputStream.toByteArray();
sendCompressed(outputStream, compressed);
assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
}
assertFlowFilesSuccess(WINDOWED_MESSAGES);
assertReceiveEventFound(port);
}
private void startServer() {
runner.run(1, false, true);
}
private void assertReceiveEventFound(final int port) {
final Optional<ProvenanceEventRecord> receiveRecord = runner.getProvenanceEvents().stream().filter(record ->
ProvenanceEventType.RECEIVE == record.getEventType()
).findFirst();
assertTrue(receiveRecord.isPresent());
final ProvenanceEventRecord record = receiveRecord.get();
final String expectedTransitUri = String.format(LOCALHOST_TRANSIT_URI, port);
assertEquals(expectedTransitUri, record.getTransitUri());
}
private void assertFlowFilesSuccess(final int expectedFlowFiles) {
runner.run(expectedFlowFiles, true, false);
runner.assertTransferCount(ListenBeats.REL_SUCCESS, expectedFlowFiles);
final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListenBeats.REL_SUCCESS).iterator();
int i = 1;
while (flowFiles.hasNext()) {
final MockFlowFile flowFile = flowFiles.next();
final String content = flowFile.getContent();
assertEquals(JSON_PAYLOAD, content, String.format("FlowFile Content [%d] not matched", i));
i++;
}
}
private void sendWindowSize(final OutputStream outputStream) throws IOException {
outputStream.write(ProtocolVersion.VERSION_2.getCode());
outputStream.write(FrameType.WINDOW_SIZE.getCode());
final byte[] windowSize = getUnsignedInteger(WINDOWED_MESSAGES);
outputStream.write(windowSize);
outputStream.flush();
}
private void sendMessage(final OutputStream outputStream, final int sequenceNumber) throws IOException {
outputStream.write(ProtocolVersion.VERSION_2.getCode());
outputStream.write(FrameType.JSON.getCode());
final byte[] sequenceNumberEncoded = getUnsignedInteger(sequenceNumber);
outputStream.write(sequenceNumberEncoded);
final int payloadLength = JSON_PAYLOAD.length();
final byte[] payloadSize = getUnsignedInteger(payloadLength);
outputStream.write(payloadSize);
outputStream.write(JSON_PAYLOAD.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
}
private void sendCompressed(final OutputStream outputStream, final byte[] compressed) throws IOException {
outputStream.write(ProtocolVersion.VERSION_2.getCode());
outputStream.write(FrameType.COMPRESSED.getCode());
final int payloadLength = compressed.length;
final byte[] payloadSize = getUnsignedInteger(payloadLength);
outputStream.write(payloadSize);
outputStream.write(compressed);
outputStream.flush();
}
private void assertAckPacketMatched(final InputStream inputStream, final int expectedSequenceNumber) throws IOException {
final byte[] ackPacket = new byte[ACK_PACKET_LENGTH];
final int bytesRead = inputStream.read(ackPacket);
assertEquals(ACK_PACKET_LENGTH, bytesRead);
final ByteBuffer ackPacketBuffer = ByteBuffer.wrap(ackPacket);
final byte version = ackPacketBuffer.get();
assertEquals(ProtocolVersion.VERSION_2.getCode(), version);
final byte frameType = ackPacketBuffer.get();
assertEquals(FrameType.ACK.getCode(), frameType);
final int sequenceNumber = ackPacketBuffer.getInt();
assertEquals(expectedSequenceNumber, sequenceNumber);
}
private byte[] getUnsignedInteger(final int number) {
return ByteBuffer.allocate(INTEGER_BUFFER_SIZE).putInt(number).array();
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.xml.bind.DatatypeConverter;
import java.nio.ByteBuffer;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
public class TestBeatsEncoder {
private BeatsEncoder encoder;
@BeforeEach
public void setup() {
this.encoder = new BeatsEncoder();
}
@Test
public void testEncode() {
BeatsFrame frame = new BeatsFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x41)
.payload(ByteBuffer.allocate(4).putInt(123).array())
.build();
byte[] encoded = encoder.encode(frame);
assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"), encoded);
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestBeatsFrame {
@Test
public void testInvalidVersion() {
assertThrows(BeatsFrameException.class, () -> new BeatsFrame.Builder().seqNumber(1234).dataSize(3).build());
}
@Test
public void testInvalidFrameType() {
assertThrows(BeatsFrameException.class, () -> new BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build());
}
@Test
public void testBlankFrameType() {
assertThrows(BeatsFrameException.class, () -> new BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build());
}
}