NIFI-3238 - Introduce ListenBeats processor and deprecates ListenLumberjack processor

NIFI-3238 - Rewords processor description

NIFI-3238: Removed unused imports

This closes #1418
This commit is contained in:
Andre F de Miranda 2017-01-05 03:14:59 +11:00 committed by Matt Burgess
parent c3dbbb845f
commit a32e1509b6
42 changed files with 2308 additions and 2 deletions

View File

@ -333,6 +333,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-lumberjack-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-beats-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-nar</artifactId>

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-beats-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-beats-nar</artifactId>
<version>1.2.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-beats-processors</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,233 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
under an MIT style license.
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,40 @@
nifi-beats-nar
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Compress
The following NOTICE information applies:
Apache Commons Compress
Copyright 2002-2014 The Apache Software Foundation
The files in the package org.apache.commons.compress.archivers.sevenz
were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
which has been placed in the public domain:
"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons Net
The following NOTICE information applies:
Apache Commons Net
Copyright 2001-2013 The Apache Software Foundation

View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-beats-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-beats-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-socket-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.event.BeatsEvent;
import org.apache.nifi.processors.beats.event.BeatsEventFactory;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.handler.BeatsSocketChannelHandlerFactory;
import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
import org.apache.nifi.processors.beats.response.BeatsResponse;
import org.apache.nifi.ssl.SSLContextService;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"})
@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " +
"to the content of a FlowFile." +
"This processor replaces the now deprecated ListenLumberjack")
@WritesAttributes({
@WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."),
@WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."),
@WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message. Only included if <Batch Size> is 1."),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")
})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection.")
// Nearly all Lumberjack v1 implementations require TLS to work. v2 implementations (i.e. beats) have TLS as optional
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
SSL_CONTEXT_SERVICE
);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isTrustStoreConfigured() == false) {
results.add(new ValidationResult.Builder()
.explanation("The context service must have a truststore configured for the beats forwarder client to work correctly")
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
}
return results;
}
private volatile BeatsEncoder beatsEncoder;
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
// wanted to ensure charset was already populated here
beatsEncoder = new BeatsEncoder();
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<BeatsEvent> events) throws IOException {
final EventFactory<BeatsEvent> eventFactory = new BeatsEventFactory();
final ChannelHandlerFactory<BeatsEvent, AsyncChannelDispatcher> handlerFactory = new BeatsSocketChannelHandlerFactory<>();
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
getLogger(), maxConnections, sslContext, charSet);
}
@Override
protected String getBatchKey(BeatsEvent event) {
return event.getSender();
}
protected void respond(final BeatsEvent event, final BeatsResponse beatsResponse) {
final ChannelResponse response = new BeatsChannelResponse(beatsEncoder, beatsResponse);
final ChannelResponder responder = event.getResponder();
responder.addResponse(response);
try {
responder.respond();
} catch (IOException e) {
getLogger().error("Error sending response for transaction {} due to {}",
new Object[]{event.getSeqNumber(), e.getMessage()}, e);
}
}
protected void postProcess(final ProcessContext context, final ProcessSession session, final List<BeatsEvent> events) {
// first commit the session so we guarantee we have all the events successfully
// written to FlowFiles and transferred to the success relationship
session.commit();
// respond to each event to acknowledge successful receipt
for (final BeatsEvent event : events) {
respond(event, BeatsResponse.ok(event.getSeqNumber()));
}
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("beats").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
@Override
protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
final List<BeatsEvent> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final int numAttributes = events.size() == 1 ? 5 : 4;
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(beatsAttributes.SENDER.key(), sender);
attributes.put(beatsAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
// if there was only one event then we can pass on the transaction
// NOTE: we could pass on all the transaction ids joined together
if (events.size() == 1) {
attributes.put(beatsAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber()));
}
return attributes;
}
public enum beatsAttributes implements FlowFileAttributeKey {
SENDER("beats.sender"),
PORT("beats.port"),
SEQNUMBER("beats.sequencenumber");
private final String key;
beatsAttributes(String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.event;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.nio.channels.SocketChannel;
/**
* A Beat event which adds the transaction number to the StandardEvent.
*/
public class BeatsEvent extends StandardEvent<SocketChannel> {
private final int seqNumber;
public BeatsEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final int seqNumber) {
super(sender, data, responder);
this.seqNumber = seqNumber;
}
public int getSeqNumber() {
return seqNumber;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.util.Map;
/**
* An EventFactory implementation to create BeatEvents.
*/
public class BeatsEventFactory implements EventFactory<BeatsEvent> {
@Override
public BeatsEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
final String sender = metadata.get(EventFactory.SENDER_KEY);
final int seqNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
return new BeatsEvent(sender, data, responder, seqNumber);
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.event;
/**
* Metadata keys for event.
*/
public interface BeatsMetadata {
String SEQNUMBER_KEY = "beats.sequencenumber";
}

View File

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.InflaterInputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
/**
* Decodes a Beats frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
*/
public class BeatsDecoder {
final ComponentLog logger;
private BeatsFrame.Builder frameBuilder;
private BeatsState currState = BeatsState.VERSION;
private byte decodedFrameType;
private byte[] unprocessedData;
private final Charset charset;
private final ByteArrayOutputStream currBytes;
private long windowSize;
static final int MIN_FRAME_HEADER_LENGTH = 2; // Version + Type
static final int WINDOWSIZE_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32bit unsigned window size
static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32 bit unsigned + payload
static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit unsigned sequence number + 32 bit unsigned payload length
public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
/**
* @param charset the charset to decode bytes from the frame
*/
public BeatsDecoder(final Charset charset, final ComponentLog logger) {
this(charset, new ByteArrayOutputStream(4096), logger);
}
/**
* @param charset the charset to decode bytes from the frame
* @param buffer a buffer to use while processing the bytes
*/
public BeatsDecoder(final Charset charset, final ByteArrayOutputStream buffer, final ComponentLog logger) {
this.logger = logger;
this.charset = charset;
this.currBytes = buffer;
this.frameBuilder = new BeatsFrame.Builder();
this.decodedFrameType = 0x00;
}
/**
* Resets this decoder back to its initial state.
*/
public void reset() {
frameBuilder = new BeatsFrame.Builder();
currState = BeatsState.VERSION;
decodedFrameType = 0x00;
currBytes.reset();
}
/**
* Process the next byte from the channel, updating the builder and state accordingly.
*
* @param currByte the next byte to process
* @preturn true if a frame is ready to be retrieved, false otherwise
*/
public boolean process(final byte currByte) throws BeatsFrameException {
try {
switch (currState) {
case VERSION: // Just enough data to process the version
processVERSION(currByte);
break;
case FRAMETYPE: // Also able to process the frametype
processFRAMETYPE(currByte);
break;
case PAYLOAD: // Initial bytes with version and Frame Type have already been received, start iteration over payload
processPAYLOAD(currByte);
// At one stage, the data sent to processPAYLOAD will be represente a complete frame, so we check before returning true
if (frameBuilder.frameType == FRAME_WINDOWSIZE && currState == BeatsState.COMPLETE) {
return true;
} else if (frameBuilder.frameType == FRAME_COMPRESSED && currState == BeatsState.COMPLETE) {
return true;
} else if (frameBuilder.frameType == FRAME_JSON && currState == BeatsState.COMPLETE) {
return true;
} else {
break;
}
case COMPLETE:
return true;
default:
break;
}
return false;
} catch (Exception e) {
throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
}
}
/**
* Returns the decoded frame and resets the decoder for the next frame.
* This method should be called after checking isComplete().
*
* @return the BeatsFrame that was decoded
*/
public List<BeatsFrame> getFrames() throws BeatsFrameException {
List<BeatsFrame> frames = new LinkedList<>();
if (currState != BeatsState.COMPLETE) {
throw new BeatsFrameException("Must be at the trailer of a frame");
}
try {
// Once compressed frames are expanded, they must be devided into individual frames
if (currState == BeatsState.COMPLETE && frameBuilder.frameType == FRAME_COMPRESSED) {
logger.debug("Frame is compressed, will iterate to decode", new Object[]{});
// Zero currBytes, currState and frameBuilder prior to iteration over
// decompressed bytes
currBytes.reset();
frameBuilder.reset();
currState = BeatsState.VERSION;
// Run over decompressed data and split frames
frames = splitCompressedFrames(unprocessedData);
// In case of V or wired D and J frames we just ship them across the List
} else {
final BeatsFrame frame = frameBuilder.build();
currBytes.reset();
frameBuilder.reset();
currState = BeatsState.VERSION;
frames.add(frame);
}
return frames;
} catch (Exception e) {
throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
}
}
private List<BeatsFrame> splitCompressedFrames(byte[] decompressedData) {
List<BeatsFrame> frames = new LinkedList<>();
BeatsFrame.Builder internalFrameBuilder = new BeatsFrame.Builder();
ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
// Both Lumberjack v1 and Beats (LJ v2) has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames.
// inside a compressed input.
// Or as stated in the documentation:
//
// "As an example, you could have 3 data frames compressed into a single
// 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
//
// Therefore, instead of calling process method again, just iterate over each of
// the frames and split them so they can be processed by BeatsFrameHandler
while (currentData.hasRemaining()) {
int payloadLength = 0;
internalFrameBuilder.version = currentData.get();
internalFrameBuilder.frameType = currentData.get();
switch (internalFrameBuilder.frameType) {
case FRAME_JSON:
internalFrameBuilder.seqNumber = (int) (currentData.getInt() & 0x00000000ffffffffL);
currentData.mark();
internalFrameBuilder.dataSize = currentData.getInt() & 0x00000000ffffffffL;
currentData.mark();
// Define how much data to chomp
payloadLength = Math.toIntExact(internalFrameBuilder.dataSize);
byte[] jsonBytes = new byte[payloadLength];
currentData.get(jsonBytes, 0, payloadLength);
currentData.mark();
// Add payload to frame
internalFrameBuilder.payload(jsonBytes);
break;
}
// data frame is created
BeatsFrame frame = internalFrameBuilder.build();
frames.add(frame);
internalFrameBuilder.reset();
}
return frames;
}
private void processVERSION(final byte b) {
byte version = b;
frameBuilder.version(version);
logger.debug("Version number is {}", new Object[]{version});
currBytes.write(b);
currState = BeatsState.FRAMETYPE;
}
private void processFRAMETYPE(final byte b) {
decodedFrameType = b;
frameBuilder.frameType(decodedFrameType);
logger.debug("Frame type is {}", new Object[]{decodedFrameType});
currBytes.write(b);
currState = BeatsState.PAYLOAD;
}
/** Process the outer PAYLOAD byte by byte. Once data is read state is set to COMPLETE so that the data payload
* can be processed fully using {@link #splitCompressedFrames(byte[])}
* */
private void processPAYLOAD(final byte b) {
currBytes.write(b);
switch (decodedFrameType) {
case FRAME_WINDOWSIZE: //'W'
if (currBytes.size() < WINDOWSIZE_LENGTH ) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
break;
} else if (currBytes.size() == WINDOWSIZE_LENGTH) {
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize});
// Sets payload to empty as frame contains no data
frameBuilder.payload(new byte[]{});
currBytes.reset();
currState = BeatsState.COMPLETE;
windowSize = frameBuilder.dataSize;
break;
} else { // Should never be here to be honest...
logger.debug("Saw a packet I should not have seen. Packet contents were {}", new Object[] {currBytes.toString()});
break;
}
case FRAME_COMPRESSED: //'C'
if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
if (logger.isTraceEnabled()) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
}
break;
} else if (currBytes.size() >= COMPRESSED_MIN_LENGTH) {
// If data contains more thant the minimum data size
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
if (currBytes.size() - 6 == frameBuilder.dataSize) {
try {
byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf));
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
in.close();
out.close();
unprocessedData = out.toByteArray();
// buf is no longer needed
buf = null;
logger.debug("Finished decompressing data");
// Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them
// as type 'D' frames
frameBuilder.dataSize(unprocessedData.length);
currState = BeatsState.COMPLETE;
} catch (IOException e) {
throw new BeatsFrameException("Error decompressing frame: " + e.getMessage(), e);
}
}
break;
// If currentByte.size is not lower than six and also not equal or great than 6...
} else { // Should never be here to be honest...
if (logger.isDebugEnabled()) {
logger.debug("Received a compressed frame with partial data or invalid content. The packet contents were {}", new Object[] {currBytes.toString()});
}
break;
}
case FRAME_JSON: // ''
// Because Beats can disable compression, sometimes, JSON data will be received outside a compressed
// stream (i.e. 0x43). Instead of processing it here, we defer its processing to went getFrames is
// called
if (currBytes.size() < JSON_MIN_LENGTH) {
if (logger.isTraceEnabled()) {
logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
}
break;
} else if (currBytes.size() == JSON_MIN_LENGTH) {
// Read the sequence number from bytes
frameBuilder.seqNumber = (int) (ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL);
// Read the JSON payload length
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, 10)).getInt() & 0x00000000ffffffffL;
} else if (currBytes.size() > JSON_MIN_LENGTH) {
// Wait for payload to be fully read and then complete processing
if (currBytes.size() - 10 == frameBuilder.dataSize) {
// Transfer the current payload so it can be processed by {@link #splitCompressedFrames} method.
frameBuilder.payload = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 10, currBytes.size());
currState = BeatsState.COMPLETE;
}
break;
}
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Encodes a BeatsFrame into raw bytes using the given charset.
*/
public class BeatsEncoder {
public byte[] encode(final BeatsFrame frame) {
final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
// Writes the version
buffer.write(frame.getVersion());
// Writes the frameType
buffer.write(frame.getFrameType());
// Writes the sequence number
try {
buffer.write(frame.getPayload());
} catch (IOException e) {
throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
}
return buffer.toByteArray();
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
/**
* A frame received from a channel.
*/
public class BeatsFrame {
public static final byte DELIMITER = 10;
private final byte version;
private final byte frameType;
private final byte[] payload;
private final long dataSize;
private final long seqNumber;
private BeatsFrame(final Builder builder) {
this.version = builder.version;
this.frameType = builder.frameType;
this.payload = builder.payload;
this.dataSize = builder.dataSize;
this.seqNumber = builder.seqNumber;
if (version < 2 || payload.length < 0 ) {
throw new BeatsFrameException("Invalid Frame");
}
}
public long getSeqNumber() {
return seqNumber;
}
public byte getVersion() {
return version;
}
public byte getFrameType() {
return frameType;
}
public byte [] getPayload() {
return payload;
}
/**
* Builder for a BeatsFrame.
*/
public static class Builder {
byte version;
byte frameType;
byte [] payload;
long dataSize;
int seqNumber;
public Builder() {
reset();
}
public void reset() {
version = -1;
seqNumber = -1;
frameType = -1;
payload = null;
}
public Builder version(final byte version) {
this.version = version;
return this;
}
public Builder seqNumber(final int seqNumber) {
this.seqNumber = seqNumber;
return this;
}
public Builder frameType(final byte frameType) {
this.frameType = frameType;
return this;
}
public Builder dataSize(final long dataSize) {
this.dataSize = dataSize;
return this;
}
public Builder payload(final byte [] payload) {
this.payload = payload;
return this;
}
public BeatsFrame build() {
return new BeatsFrame(this);
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
/**
* Represents an error encountered when decoding frames.
*/
public class BeatsFrameException extends RuntimeException {
public BeatsFrameException(String message) {
super(message);
}
public BeatsFrameException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
/**
* The stages of parsing of a Beats conversation.
*/
public enum BeatsState {
VERSION, // First stage is parsing the version
FRAMETYPE, // Second stage is to be able to read the frame type
PAYLOAD, // payload being populated
COMPLETE // complete packet handling
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.beats.event.BeatsMetadata;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
/**
* Encapsulates the logic to handle a BeatsFrame once it has been read from the channel.
*/
public class BeatsFrameHandler<E extends Event<SocketChannel>> {
private final Charset charset;
private final EventFactory<E> eventFactory;
private final EventQueue<E> events;
private final SelectionKey key;
private final AsyncChannelDispatcher dispatcher;
private final BeatsEncoder encoder;
private final ComponentLog logger;
public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
public BeatsFrameHandler(final SelectionKey selectionKey,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final AsyncChannelDispatcher dispatcher,
final ComponentLog logger) {
this.key = selectionKey;
this.charset = charset;
this.eventFactory = eventFactory;
this.dispatcher = dispatcher;
this.logger = logger;
this.events = new EventQueue<>(events, logger);
this.encoder = new BeatsEncoder();
}
public void handle(final BeatsFrame frame, final ChannelResponder<SocketChannel> responder, final String sender)
throws IOException, InterruptedException {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
String line = "";
/* If frameType is a JSON , parse the frame payload into a JsonElement so that all JSON elements but "message"
are inserted into the event metadata.
As per above, the "message" element gets added into the body of the event
*/
if (frame.getFrameType() == FRAME_JSON ) {
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(frame.getPayload(), metadata, responder);
events.offer(event);
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
import org.apache.nifi.processors.beats.frame.BeatsDecoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsFrameException;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
/**
* A Beats compatible implementation of SSLSocketChannelHandler.
*/
public class BeatsSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
private BeatsDecoder decoder;
private BeatsFrameHandler<E> frameHandler;
public BeatsSSLSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new BeatsDecoder(charset, logger);
this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the packet command
for (int i = 0; i < bytesRead; i++) {
byte currByte = buffer[i];
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<BeatsFrame> frames = decoder.getFrames();
// A list of events has been generated
for (BeatsFrame frame : frames) {
logger.debug("Received Beats frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOWS type frames as they contain no payload.
if (frame.getFrameType() != 0x57 ) {
final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
}
}
logger.debug("Done processing buffer");
} catch (final BeatsFrameException rfe) {
logger.error("Error reading Beats frames due to {}", new Object[] {rfe.getMessage()} , rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.apache.nifi.processors.beats.frame.BeatsDecoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsFrameException;
/**
* Extends the StandardSocketChannelHandler to decode bytes into Beats frames.
*/
public class BeatsSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
private BeatsDecoder decoder;
private BeatsFrameHandler<E> frameHandler;
public BeatsSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new BeatsDecoder(charset, logger);
this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer)
throws InterruptedException, IOException {
// get total bytes in buffer
final int total = socketBuffer.remaining();
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the packet command
for (int i = 0; i < total; i++) {
byte currByte = socketBuffer.get();
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<BeatsFrame> frames = decoder.getFrames();
for (BeatsFrame frame : frames) {
logger.debug("Received Beats frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOW SIZE type frames as they contain no payload.
if (frame.getFrameType() != 0x57) {
final SocketChannelResponder responder = new SocketChannelResponder(socketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
socketBuffer.mark();
}
}
logger.debug("Done processing buffer");
} catch (final BeatsFrameException rfe) {
logger.error("Error reading Beats frames due to {}", new Object[] {rfe.getMessage()}, rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
// not used for anything in Beats since the decoder encapsulates the delimiter
@Override
public byte getDelimiter() {
return BeatsFrame.DELIMITER;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
/**
* Default factory for creating Beats socket channel handlers.
*/
public class BeatsSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new BeatsSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new BeatsSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.response;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
/**
* Creates a BeatsFrame for the provided response and returns the encoded frame.
*/
public class BeatsChannelResponse implements ChannelResponse {
private final BeatsEncoder encoder;
private final BeatsResponse response;
public BeatsChannelResponse(final BeatsEncoder encoder, final BeatsResponse response) {
this.encoder = encoder;
this.response = response;
}
@Override
public byte[] toByteArray() {
final BeatsFrame frame = response.toFrame();
return encoder.encode(frame);
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.response;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import java.nio.ByteBuffer;
/**
'ack' frame type
SENT FROM READER ONLY
frame type value: ASCII 'A' aka byte value 0x41
Payload:
32bit unsigned sequence number.
*/
public class BeatsResponse {
private final int seqNumber;
final private byte version = 0x32; // v2
final private byte frameType = 0x41; // A or ACK
public BeatsResponse(final int seqNumber) {
this.seqNumber = seqNumber;
}
/**
* Creates a BeatsFrame where the data portion will contain this response.
*
*
* @return a BeatsFrame for for this response
*/
public BeatsFrame toFrame() {
return new BeatsFrame.Builder()
.version(version)
.frameType(frameType)
.payload(ByteBuffer.allocate(4).putInt(seqNumber).array())
.build();
}
public static BeatsResponse ok(final int seqNumber) {
return new BeatsResponse(seqNumber);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.beats.ListenBeats

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TestBeatsEventFactory {
@Test
public void testCreateLumberJackEvent() {
final String sender = "testsender1";
final byte[] data = "this is a test line".getBytes();
final int seqNumber = 1;
final String fields = "{\"file\":\"test\"}";
final Map<String,String> metadata = new HashMap<>();
metadata.put(EventFactory.SENDER_KEY, sender);
metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber));
final ChannelResponder responder = new SocketChannelResponder(null);
final EventFactory<BeatsEvent> factory = new BeatsEventFactory();
final BeatsEvent event = factory.create(data, metadata, responder);
Assert.assertEquals(sender, event.getSender());
Assert.assertEquals(seqNumber, event.getSeqNumber());
Assert.assertEquals(data, event.getData());
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import java.nio.ByteBuffer;
import javax.xml.bind.DatatypeConverter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestBeatsEncoder {
private BeatsEncoder encoder;
@Before
public void setup() {
this.encoder = new BeatsEncoder();
}
@Test
public void testEncode() {
BeatsFrame frame = new BeatsFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x41)
.payload(ByteBuffer.allocate(4).putInt(123).array())
.build();
byte[] encoded = encoder.encode(frame);
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"), encoded);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.frame;
import org.junit.Test;
public class TestBeatsFrame {
@Test(expected = BeatsFrameException.class)
public void testInvalidVersion() {
new BeatsFrame.Builder().seqNumber(1234).dataSize(3).build();
}
@Test(expected = BeatsFrameException.class)
public void testInvalidFrameType() {
new BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build();
}
@Test(expected = BeatsFrameException.class)
public void testBlankFrameType() {
new BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build();
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.event.BeatsEvent;
import org.apache.nifi.processors.beats.event.BeatsEventFactory;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.frame.BeatsFrame;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestBeatsFrameHandler {
private Charset charset;
private EventFactory<BeatsEvent> eventFactory;
private BlockingQueue<BeatsEvent> events;
private SelectionKey key;
private AsyncChannelDispatcher dispatcher;
private BeatsEncoder encoder;
private ComponentLog logger;
private BeatsFrameHandler<BeatsEvent> frameHandler;
@Before
public void setup() {
this.charset = StandardCharsets.UTF_8;
this.eventFactory = new BeatsEventFactory();
this.events = new LinkedBlockingQueue<>();
this.key = Mockito.mock(SelectionKey.class);
this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
this.logger = Mockito.mock(ComponentLog.class);
this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Test
public void testWindow() throws IOException, InterruptedException {
final BeatsFrame openFrame = new BeatsFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x57)
.seqNumber(-1)
.payload(Integer.toString(1).getBytes())
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
}
@Test
public void testJson() throws IOException, InterruptedException {
final byte jsonPayload[] = new byte[]{
// Payload eq { "message": "test-content", "field": "value"}
0x7b, 0x22, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67,
0x65, 0x22, 0x3a, 0x20,
0x22, 0x74, 0x65, 0x73,
0x74, 0x2d, 0x63, 0x6f,
0x6e, 0x74, 0x65, 0x6e,
0x74, 0x22, 0x2c, 0x20,
0x22, 0x66, 0x69, 0x65,
0x6c, 0x64, 0x22, 0x3a,
0x20, 0x22, 0x76, 0x61,
0x6c, 0x75, 0x65, 0x22,
0x7d
};
final BeatsFrame jsonFrame = new BeatsFrame.Builder()
.version((byte) 0x32)
.frameType((byte) 0x4a)
.seqNumber(1)
.dataSize(45)
.payload(jsonPayload)
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(jsonFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
// But events should contain one event
Assert.assertEquals(1, events.size());
final BeatsEvent event = events.poll();
Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", new String(event.getData(), charset));
}
private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
int responded;
List<ChannelResponse> responses = new ArrayList<>();
@Override
public SocketChannel getChannel() {
return Mockito.mock(SocketChannel.class);
}
@Override
public List<ChannelResponse> getResponses() {
return responses;
}
@Override
public void addResponse(ChannelResponse response) {
responses.add(response);
}
@Override
public void respond() throws IOException {
responded++;
}
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.beats.handler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.beats.event.BeatsMetadata;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestBeatsSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private BlockingQueue<ByteBuffer> byteBuffers;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
private SSLContext sslContext;
private Charset charset;
private ChannelDispatcher dispatcher;
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
maxConnections, sslContext, charset);
}
@Test
public void testWiredJsonHandling() throws IOException, InterruptedException {
final String singleJsonFrame = "324a000000010000002d7b226d657373616765223a2022746573742d636f6e74656e7422" +
"2c20226669656c64223a202276616c7565227d";
final List<String> messages = new ArrayList<>();
messages.add(singleJsonFrame);
run(messages);
// Check for the 1 frames (from the hex string above) are back...
Assert.assertEquals(1, events.size());
TestEvent event;
while((event = events.poll()) != null) {
Map<String, String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
final String line = new String(event.getData());
Assert.assertTrue(seqNum.equals("1"));
Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", line);
}
}
@Test
public void testCompressedJsonHandling() throws IOException, InterruptedException {
final String multipleJsonFrame = "3243000000E27801CC91414BC3401085477FCA3B6F" +
"93EEB6A5B8A71E3CF5ECC98BECC6491AC86643665290903FAE17A982540F8237E7F" +
"80D3C78EF734722BA21A2B71987C41A9E8306F819FA32303CBADCC020725078D46D" +
"C791836231D0EB7FDB0F933EE9354A2C129A4B44F8B8AF94197D4817CE7CCF67189" +
"CB2E80F74E651DADCC36357D8C2623138689B5834A4011E6E6DF7ABB55DAD770F76" +
"E3B7777EBB299CB58F30903C8D15C3A33CE5C465A8A74ACA2E3792A7B1E25259B4E" +
"87203835CD7C20ABF5FDC91886E89E8F58F237CEEF2EF1A5967BEFBFBD54F8C3162" +
"790F0000FFFF6CB6A08D";
final List<String> messages = new ArrayList<>();
messages.add(multipleJsonFrame);
run(messages);
// Check for the 2 frames (from the hex string above) are back...
Assert.assertEquals(2, events.size());
boolean found1 = false;
boolean found2 = false;
TestEvent event;
while((event = events.poll()) != null) {
Map<String, String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
final String line = new String(event.getData());
if (seqNum.equals("1") && line.contains("\"message\":\"aaaaaa\"")) {
found1 = true;
}
if (seqNum.equals("2") && line.contains("\"message\":\"bbbb\"")) {
found2 = true;
}
}
Assert.assertTrue(found1 && found2);
}
protected void run(List<String> messages) throws IOException, InterruptedException {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);
dispatcherThread.start();
// create a client connection to the port the dispatcher is listening on
final int realPort = dispatcher.getPort();
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", realPort));
Thread.sleep(100);
// send the provided messages
for (int i=0; i < messages.size(); i++) {
buffer.clear();
buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(1);
}
}
// wait up to 10 seconds to verify the responses
long timeout = 10000;
long startTime = System.currentTimeMillis();
while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
Thread.sleep(100);
}
} finally {
// stop the dispatcher thread and ensure we shut down handler threads
dispatcher.close();
}
}
// Test event to produce from the data
private static class TestEvent implements Event<SocketChannel> {
private byte[] data;
private Map<String, String> metadata;
public TestEvent(byte[] data, Map<String, String> metadata) {
this.data = data;
this.metadata = metadata;
}
@Override
public String getSender() {
return metadata.get(EventFactory.SENDER_KEY);
}
@Override
public byte[] getData() {
return data;
}
@Override
public ChannelResponder<SocketChannel> getResponder() {
return null;
}
}
// Factory to create test events and send responses for testing
private static class TestEventHolderFactory implements EventFactory<TestEvent> {
@Override
public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
return new TestEvent(data, metadata);
}
}
}

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-beats-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-beats-processors</module>
<module>nifi-beats-nar</module>
</modules>
</project>

View File

@ -62,12 +62,13 @@ import org.apache.nifi.ssl.SSLContextService;
import com.google.gson.Gson;
@Deprecated
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "lumberjack", "tcp", "logs"})
@CapabilityDescription("Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " +
@CapabilityDescription("This processor is deprecated and may be removed in the near future. Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " +
"acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " +
"portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " +
"output of this processor can be sent to a ParseSyslog processor for further processing.")
"output of this processor can be sent to a ParseSyslog processor for further processing. ")
@WritesAttributes({
@WritesAttribute(attribute = "lumberjack.sender", description = "The sending host of the messages."),
@WritesAttribute(attribute = "lumberjack.port", description = "The sending port the messages were received over."),
@ -76,6 +77,9 @@ import com.google.gson.Gson;
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain")
})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
/**
* @deprecated As of release 1.2.0, replaced by {@link org.apache.nifi.processors.beats.ListenBeats}
* */
public class ListenLumberjack extends AbstractListenEventBatchingProcessor<LumberjackEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()

View File

@ -24,6 +24,7 @@ import java.nio.channels.SocketChannel;
/**
* A Lumberjack event which adds the transaction number and command to the StandardEvent.
*/
@Deprecated
public class LumberjackEvent extends StandardEvent<SocketChannel> {
private final long seqNumber;

View File

@ -24,6 +24,7 @@ import java.util.Map;
/**
* An EventFactory implementation to create LumberjackEvents.
*/
@Deprecated
public class LumberjackEventFactory implements EventFactory<LumberjackEvent> {
@Override

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.event;
/**
* Metadata keys for Lumberjack.
*/
@Deprecated
public interface LumberjackMetadata {
String SEQNUMBER_KEY = "lumberjack.sequencenumber";

View File

@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
* Decodes a Lumberjack frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
*/
@Deprecated
public class LumberjackDecoder {
static final Logger logger = LoggerFactory.getLogger(LumberjackDecoder.class);

View File

@ -22,6 +22,7 @@ import java.io.IOException;
/**
* Encodes a LumberjackFrame into raw bytes using the given charset.
*/
@Deprecated
public class LumberjackEncoder {

View File

@ -20,6 +20,7 @@ package org.apache.nifi.processors.lumberjack.frame;
/**
* A Lumberjack frame received from a channel.
*/
@Deprecated
public class LumberjackFrame {
public static final byte DELIMITER = 10;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.frame;
/**
* Represents an error encountered when decoding Lumberjack frames.
*/
@Deprecated
public class LumberjackFrameException extends RuntimeException {
public LumberjackFrameException(String message) {

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.frame;
/**
* The parts of a Lumberjack frame.
*/
@Deprecated
public enum LumberjackState {
VERSION,

View File

@ -42,6 +42,7 @@ import com.google.gson.Gson;
/**
* Encapsulates the logic to handle a LumberjackFrame once it has been read from the channel.
*/
@Deprecated
public class LumberjackFrameHandler<E extends Event<SocketChannel>> {
private final Charset charset;

View File

@ -38,6 +38,7 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
/**
* A Lumberjack implementation of SSLSocketChannelHandler.
*/
@Deprecated
public class LumberjackSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
private LumberjackDecoder decoder;

View File

@ -38,6 +38,7 @@ import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException;
/**
* Extends the StandardSocketChannelHandler to decode bytes into Lumberjack frames.
*/
@Deprecated
public class LumberjackSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
private LumberjackDecoder decoder;

View File

@ -31,6 +31,7 @@ import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
/**
* Default factory for creating Lumberjack socket channel handlers.
*/
@Deprecated
public class LumberjackSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override

View File

@ -58,6 +58,7 @@
<module>nifi-splunk-bundle</module>
<module>nifi-jms-bundle</module>
<module>nifi-lumberjack-bundle</module>
<module>nifi-beats-bundle</module>
<module>nifi-cassandra-bundle</module>
<module>nifi-spring-bundle</module>
<module>nifi-hive-bundle</module>

View File

@ -1221,6 +1221,12 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-lumberjack-nar</artifactId>
<version>1.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-beats-nar</artifactId>
<version>1.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>