mirror of https://github.com/apache/nifi.git
NIFI-13499 Removed ListenBeats and nifi-beats-bundle
This closes #9040 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
bd80506e89
commit
db6bac21bf
|
@ -745,11 +745,6 @@
|
|||
<artifactId>nifi-azure-services-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-beats-processors</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-box-processors</artifactId>
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-beats-bundle</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-beats-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-beats-processors</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -1,233 +0,0 @@
|
|||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
APACHE NIFI SUBCOMPONENTS:
|
||||
|
||||
The Apache NiFi project contains subcomponents with separate copyright
|
||||
notices and license terms. Your use of the source code for the these
|
||||
subcomponents is subject to the terms and conditions of the following
|
||||
licenses.
|
||||
|
||||
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
|
||||
under an MIT style license.
|
||||
|
||||
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
|
@ -1,63 +0,0 @@
|
|||
nifi-beats-nar
|
||||
Copyright 2014-2024 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
===========================================
|
||||
Apache Software License v2
|
||||
===========================================
|
||||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Apache Commons Compress
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Compress
|
||||
Copyright 2002-2017 The Apache Software Foundation
|
||||
|
||||
The files in the package org.apache.commons.compress.archivers.sevenz
|
||||
were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
|
||||
which has been placed in the public domain:
|
||||
|
||||
"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
|
||||
|
||||
(ASLv2) Apache Commons IO
|
||||
The following NOTICE information applies:
|
||||
Apache Commons IO
|
||||
Copyright 2002-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Commons Lang
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Lang
|
||||
Copyright 2001-2015 The Apache Software Foundation
|
||||
|
||||
This product includes software from the Spring Framework,
|
||||
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
|
||||
|
||||
(ASLv2) Apache Commons Net
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Net
|
||||
Copyright 2001-2013 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Jackson JSON processor
|
||||
The following NOTICE information applies:
|
||||
# Jackson JSON processor
|
||||
|
||||
Jackson is a high-performance, Free/Open Source JSON processing library.
|
||||
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
|
||||
been in development since 2007.
|
||||
It is currently developed by a community of developers, as well as supported
|
||||
commercially by FasterXML.com.
|
||||
|
||||
## Licensing
|
||||
|
||||
Jackson core and extension components may licensed under different licenses.
|
||||
To find the details that apply to this artifact see the accompanying LICENSE file.
|
||||
For more information, including possible other licensing options, contact
|
||||
FasterXML.com (http://fasterxml.com).
|
||||
|
||||
## Credits
|
||||
|
||||
A list of contributors may be found from CREDITS file, which is included
|
||||
in some artifacts (usually source distributions); but is always available
|
||||
from the source code management (SCM) system project uses.
|
|
@ -1,65 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-beats-bundle</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-beats-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-event-listen</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-buffer</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-event-transport</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -1,265 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.event.transport.EventException;
|
||||
import org.apache.nifi.event.transport.EventServer;
|
||||
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
|
||||
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
|
||||
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.listen.EventBatcher;
|
||||
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
|
||||
import org.apache.nifi.processor.util.listen.ListenerProperties;
|
||||
import org.apache.nifi.processors.beats.protocol.BatchMessage;
|
||||
import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
|
||||
import org.apache.nifi.remote.io.socket.NetworkUtils;
|
||||
import org.apache.nifi.security.util.ClientAuth;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"beats", "logstash", "elasticsearch", "log"})
|
||||
@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats protocol and write decoded JSON")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "beats.sender", description = "Internet Protocol address of the message sender"),
|
||||
@WritesAttribute(attribute = "beats.port", description = "TCP port on which the Processor received messages"),
|
||||
@WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message included for batches containing single messages"),
|
||||
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")
|
||||
})
|
||||
public class ListenBeats extends AbstractProcessor {
|
||||
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL_CONTEXT_SERVICE")
|
||||
.displayName("SSL Context Service")
|
||||
.description("SSL Context Service is required to enable TLS for socket connections")
|
||||
.required(false)
|
||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
|
||||
.name("Client Auth")
|
||||
.displayName("Client Authentication")
|
||||
.description("Client authentication policy when TLS is enabled")
|
||||
.required(false)
|
||||
.dependsOn(SSL_CONTEXT_SERVICE)
|
||||
.allowableValues(ClientAuth.values())
|
||||
.defaultValue(ClientAuth.REQUIRED.name())
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("Messages received successfully will be sent out this relationship.")
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
||||
ListenerProperties.NETWORK_INTF_NAME,
|
||||
ListenerProperties.PORT,
|
||||
ListenerProperties.RECV_BUFFER_SIZE,
|
||||
ListenerProperties.MAX_MESSAGE_QUEUE_SIZE,
|
||||
ListenerProperties.MAX_SOCKET_BUFFER_SIZE,
|
||||
ListenerProperties.CHARSET,
|
||||
ListenerProperties.MAX_BATCH_SIZE,
|
||||
ListenerProperties.MESSAGE_DELIMITER,
|
||||
ListenerProperties.WORKER_THREADS,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
CLIENT_AUTH
|
||||
));
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
|
||||
|
||||
protected volatile BlockingQueue<BatchMessage> events;
|
||||
protected volatile BlockingQueue<BatchMessage> errorEvents;
|
||||
protected volatile EventServer eventServer;
|
||||
protected volatile byte[] messageDemarcatorBytes;
|
||||
protected volatile EventBatcher<BatchMessage> eventBatcher;
|
||||
|
||||
@Override
|
||||
public final Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) throws IOException {
|
||||
final int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
|
||||
final int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
|
||||
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
|
||||
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
|
||||
final int port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
|
||||
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
|
||||
errorEvents = new LinkedBlockingQueue<>();
|
||||
final String msgDemarcator = getMessageDemarcator(context);
|
||||
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
|
||||
|
||||
final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, events);
|
||||
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
if (sslContextService != null) {
|
||||
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
|
||||
ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
|
||||
SSLContext sslContext = sslContextService.createContext();
|
||||
eventFactory.setSslContext(sslContext);
|
||||
eventFactory.setClientAuth(clientAuth);
|
||||
}
|
||||
|
||||
eventFactory.setSocketReceiveBuffer(socketBufferSize);
|
||||
eventFactory.setWorkerThreads(workerThreads);
|
||||
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
|
||||
eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
|
||||
eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
|
||||
|
||||
try {
|
||||
eventServer = eventFactory.getEventServer();
|
||||
} catch (final EventException e) {
|
||||
getLogger().error("Failed to bind to [{}:{}]", address, port, e);
|
||||
}
|
||||
}
|
||||
|
||||
public int getListeningPort() {
|
||||
return eventServer.getListeningPort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
EventBatcher<BatchMessage> eventBatcher = getEventBatcher();
|
||||
|
||||
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
|
||||
Map<String, FlowFileEventBatch<BatchMessage>> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
|
||||
processEvents(session, batches);
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void shutdown() {
|
||||
if (eventServer == null) {
|
||||
getLogger().warn("Event Server not configured");
|
||||
} else {
|
||||
eventServer.shutdown();
|
||||
}
|
||||
eventBatcher = null;
|
||||
}
|
||||
|
||||
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<BatchMessage>> batches) {
|
||||
for (final Map.Entry<String, FlowFileEventBatch<BatchMessage>> entry : batches.entrySet()) {
|
||||
FlowFile flowFile = entry.getValue().getFlowFile();
|
||||
final List<BatchMessage> events = entry.getValue().getEvents();
|
||||
|
||||
if (flowFile.getSize() == 0L || events.size() == 0) {
|
||||
session.remove(flowFile);
|
||||
continue;
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = getAttributes(entry.getValue());
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final String transitUri = getTransitUri(entry.getValue());
|
||||
session.getProvenanceReporter().receive(flowFile, transitUri);
|
||||
}
|
||||
}
|
||||
|
||||
private String getTransitUri(final FlowFileEventBatch<BatchMessage> batch) {
|
||||
final List<BatchMessage> events = batch.getEvents();
|
||||
final String sender = events.get(0).getSender();
|
||||
return String.format("beats://%s:%d", sender, getListeningPort());
|
||||
}
|
||||
|
||||
private Map<String, String> getAttributes(final FlowFileEventBatch<BatchMessage> batch) {
|
||||
final List<BatchMessage> events = batch.getEvents();
|
||||
|
||||
final String sender = events.get(0).getSender();
|
||||
final Map<String, String> attributes = new LinkedHashMap<>();
|
||||
attributes.put(BeatsAttributes.SENDER.key(), sender);
|
||||
attributes.put(BeatsAttributes.PORT.key(), String.valueOf(getListeningPort()));
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
|
||||
|
||||
if (events.size() == 1) {
|
||||
attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(), String.valueOf(events.get(0).getSequenceNumber()));
|
||||
}
|
||||
return attributes;
|
||||
}
|
||||
|
||||
private String getMessageDemarcator(final ProcessContext context) {
|
||||
return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
|
||||
.getValue()
|
||||
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
|
||||
}
|
||||
|
||||
private EventBatcher<BatchMessage> getEventBatcher() {
|
||||
if (eventBatcher == null) {
|
||||
eventBatcher = new EventBatcher<BatchMessage>(getLogger(), events, errorEvents) {
|
||||
@Override
|
||||
protected String getBatchKey(final BatchMessage event) {
|
||||
return event.getSender();
|
||||
}
|
||||
};
|
||||
}
|
||||
return eventBatcher;
|
||||
}
|
||||
|
||||
private enum BeatsAttributes implements FlowFileAttributeKey {
|
||||
SENDER("beats.sender"),
|
||||
PORT("beats.port"),
|
||||
SEQUENCE_NUMBER("beats.sequencenumber");
|
||||
|
||||
private final String key;
|
||||
|
||||
BeatsAttributes(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String key() {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,83 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.handler;
|
||||
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.beats.protocol.Batch;
|
||||
import org.apache.nifi.processors.beats.protocol.BatchMessage;
|
||||
import org.apache.nifi.processors.beats.protocol.MessageAck;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
/**
|
||||
* Batch Channel Inbound Handler processes a batch of messages and sends an acknowledgement for the last sequence number
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class BatchChannelInboundHandler extends SimpleChannelInboundHandler<Batch> {
|
||||
private final ComponentLog log;
|
||||
|
||||
private final BlockingQueue<BatchMessage> messages;
|
||||
|
||||
/**
|
||||
* Batch Channel Inbound Handler with required arguments
|
||||
*
|
||||
* @param log Processor Log
|
||||
* @param messages Queue of messages
|
||||
*/
|
||||
public BatchChannelInboundHandler(final ComponentLog log, final BlockingQueue<BatchMessage> messages) {
|
||||
this.log = Objects.requireNonNull(log, "Component Log required");
|
||||
this.messages = Objects.requireNonNull(messages, "Message Queue required");
|
||||
}
|
||||
|
||||
/**
|
||||
* Channel Read processes a batch of messages and sends an acknowledgement for the last sequence number
|
||||
*
|
||||
* @param context Channel Handler Context
|
||||
* @param batch Batch of messages
|
||||
*/
|
||||
@Override
|
||||
protected void channelRead0(final ChannelHandlerContext context, final Batch batch) {
|
||||
Integer lastSequenceNumber = null;
|
||||
|
||||
final Collection<BatchMessage> batchMessages = batch.getMessages();
|
||||
int queued = 0;
|
||||
for (final BatchMessage batchMessage : batchMessages) {
|
||||
final int sequenceNumber = batchMessage.getSequenceNumber();
|
||||
final String sender = batchMessage.getSender();
|
||||
if (messages.offer(batchMessage)) {
|
||||
log.debug("Message Sequence Number [{}] Sender [{}] queued", sequenceNumber, sender);
|
||||
lastSequenceNumber = batchMessage.getSequenceNumber();
|
||||
queued++;
|
||||
} else {
|
||||
log.warn("Message Sequence Number [{}] Sender [{}] queuing failed: Queued [{}] of [{}]", sequenceNumber, sender, queued, batchMessages.size());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (lastSequenceNumber == null) {
|
||||
log.warn("Batch Messages [{}] queuing failed", batch.getMessages().size());
|
||||
} else {
|
||||
final MessageAck messageAck = new MessageAck(lastSequenceNumber);
|
||||
context.writeAndFlush(messageAck);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,380 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.handler;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.beats.protocol.Batch;
|
||||
import org.apache.nifi.processors.beats.protocol.BatchMessage;
|
||||
import org.apache.nifi.processors.beats.protocol.FrameType;
|
||||
import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
|
||||
import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
|
||||
import org.apache.nifi.processors.beats.protocol.ProtocolException;
|
||||
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
|
||||
import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.zip.Inflater;
|
||||
import java.util.zip.InflaterOutputStream;
|
||||
|
||||
/**
|
||||
* Byte Buffer to Batch Decoder parses bytes to batches of Beats messages
|
||||
*/
|
||||
public class BatchDecoder extends ByteToMessageDecoder {
|
||||
private static final int INITIAL_WINDOW_SIZE = 1;
|
||||
|
||||
private static final int INITIAL_QUEUE_SIZE = 1;
|
||||
|
||||
private static final int CODE_READABLE_BYTES = 1;
|
||||
|
||||
private static final int INT_READABLE_BYTES = 4;
|
||||
|
||||
private static final ProtocolCodeDecoder<ProtocolVersion> VERSION_DECODER = new ProtocolVersionDecoder();
|
||||
|
||||
private static final ProtocolCodeDecoder<FrameType> FRAME_TYPE_DECODER = new FrameTypeDecoder();
|
||||
|
||||
private final ComponentLog log;
|
||||
|
||||
private final AtomicReference<ProtocolVersion> versionRef = new AtomicReference<>();
|
||||
|
||||
private final AtomicReference<FrameType> frameTypeRef = new AtomicReference<>();
|
||||
|
||||
private final AtomicInteger windowSize = new AtomicInteger(INITIAL_WINDOW_SIZE);
|
||||
|
||||
private final AtomicReference<Integer> sequenceNumberRef = new AtomicReference<>();
|
||||
|
||||
private final AtomicReference<Integer> payloadSizeRef = new AtomicReference<>();
|
||||
|
||||
private final AtomicReference<Integer> compressedSizeRef = new AtomicReference<>();
|
||||
|
||||
private Queue<BatchMessage> batchMessages = new ArrayBlockingQueue<>(INITIAL_QUEUE_SIZE);
|
||||
|
||||
/**
|
||||
* Beats Batch Decoder with required arguments
|
||||
*
|
||||
* @param log Processor Log
|
||||
*/
|
||||
public BatchDecoder(final ComponentLog log) {
|
||||
this.log = Objects.requireNonNull(log, "Component Log required");
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode Batch of Beats Messages from Byte Buffer
|
||||
*
|
||||
* @param context Channel Handler Context
|
||||
* @param buffer Byte Buffer
|
||||
* @param objects List of Batch objects
|
||||
*/
|
||||
@Override
|
||||
protected void decode(final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> objects) {
|
||||
final ProtocolVersion protocolVersion = readVersion(buffer);
|
||||
if (ProtocolVersion.VERSION_2 == protocolVersion) {
|
||||
final FrameType frameType = readFrameType(buffer);
|
||||
decodeFrameType(frameType, context, buffer, objects);
|
||||
} else if (ProtocolVersion.VERSION_1 == protocolVersion) {
|
||||
throw new ProtocolException("Protocol Version [1] not supported");
|
||||
}
|
||||
}
|
||||
|
||||
private void decodeFrameType(final FrameType frameType, final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> batches) {
|
||||
if (frameType == null) {
|
||||
log.trace("Frame Type not found");
|
||||
} else if (FrameType.COMPRESSED == frameType) {
|
||||
processCompressed(context, buffer, batches);
|
||||
} else if (FrameType.WINDOW_SIZE == frameType) {
|
||||
processWindowSize(context, buffer);
|
||||
} else if (FrameType.JSON == frameType) {
|
||||
processJson(context, buffer, batches);
|
||||
} else {
|
||||
final String message = String.format("Frame Type [%s] not supported", frameType);
|
||||
throw new ProtocolException(message);
|
||||
}
|
||||
}
|
||||
|
||||
private void processWindowSize(final ChannelHandlerContext context, final ByteBuf buffer) {
|
||||
final Integer readWindowSize = readUnsignedInteger(buffer);
|
||||
if (readWindowSize == null) {
|
||||
log.trace("State [Read Window Size] not enough readable bytes");
|
||||
} else {
|
||||
windowSize.getAndSet(readWindowSize);
|
||||
batchMessages = new ArrayBlockingQueue<>(readWindowSize);
|
||||
|
||||
resetFrameTypeVersion();
|
||||
final Channel channel = context.channel();
|
||||
log.debug("Processed Window Size [{}] Local [{}] Remote [{}]", readWindowSize, channel.localAddress(), channel.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
private void processCompressed(final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> batches) {
|
||||
final Integer readCompressedSize = readCompressedSize(buffer);
|
||||
if (readCompressedSize == null) {
|
||||
log.trace("State [Read Compressed] not enough readable bytes");
|
||||
} else {
|
||||
final int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes >= readCompressedSize) {
|
||||
final Channel channel = context.channel();
|
||||
log.debug("Processing Compressed Size [{}] Local [{}] Remote [{}]", readCompressedSize, channel.localAddress(), channel.remoteAddress());
|
||||
|
||||
processCompressed(context, buffer, readCompressedSize, batches);
|
||||
} else {
|
||||
log.trace("State [Read Compressed] not enough readable bytes [{}] for compressed [{}]", readableBytes, readCompressedSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processCompressed(
|
||||
final ChannelHandlerContext context,
|
||||
final ByteBuf buffer,
|
||||
final int compressedSize,
|
||||
final List<Object> batches
|
||||
) {
|
||||
final ByteBuf inflated = context.alloc().buffer(compressedSize);
|
||||
try {
|
||||
readCompressedBuffer(buffer, inflated, compressedSize);
|
||||
|
||||
// Clear status prior to decoding inflated frames
|
||||
resetSequenceVersionPayloadSize();
|
||||
resetFrameTypeVersion();
|
||||
|
||||
while (inflated.isReadable()) {
|
||||
decode(context, inflated, batches);
|
||||
}
|
||||
} finally {
|
||||
compressedSizeRef.set(null);
|
||||
inflated.release();
|
||||
}
|
||||
}
|
||||
|
||||
private void processJson(final ChannelHandlerContext context, final ByteBuf buffer, final List<Object> batches) {
|
||||
final Channel channel = context.channel();
|
||||
|
||||
final Integer sequenceNumber = readSequenceNumber(buffer);
|
||||
if (sequenceNumber == null) {
|
||||
log.trace("State [Read JSON] Sequence Number not found Remote [{}]", channel.remoteAddress());
|
||||
} else {
|
||||
final Integer payloadSize = readPayloadSize(buffer);
|
||||
if (payloadSize == null) {
|
||||
log.trace("State [Read JSON] Payload Size not found Remote [{}]", channel.remoteAddress());
|
||||
} else {
|
||||
processJson(sequenceNumber, payloadSize, context, buffer, batches);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processJson(
|
||||
final int sequenceNumber,
|
||||
final int payloadSize,
|
||||
final ChannelHandlerContext context,
|
||||
final ByteBuf buffer,
|
||||
final List<Object> batches
|
||||
) {
|
||||
final Channel channel = context.channel();
|
||||
|
||||
final BatchMessage batchMessage = readJsonMessage(context, sequenceNumber, payloadSize, buffer);
|
||||
if (batchMessage == null) {
|
||||
log.trace("State [Read JSON] Message not found Remote [{}]", channel.remoteAddress());
|
||||
} else {
|
||||
processBatchMessage(batchMessage, batches);
|
||||
log.debug("Processed JSON Message Sequence Number [{}] Payload Size [{}] Local [{}] Remote [{}]", sequenceNumber, payloadSize, channel.localAddress(), channel.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
private BatchMessage readJsonMessage(
|
||||
final ChannelHandlerContext context,
|
||||
final int sequenceNumber,
|
||||
final int payloadSize,
|
||||
final ByteBuf buffer
|
||||
) {
|
||||
final BatchMessage batchMessage;
|
||||
|
||||
final int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes >= payloadSize) {
|
||||
final byte[] payload = new byte[payloadSize];
|
||||
buffer.readBytes(payload);
|
||||
|
||||
final Channel channel = context.channel();
|
||||
final String sender = getRemoteHostAddress(channel);
|
||||
batchMessage = new BatchMessage(sender, payload, sequenceNumber);
|
||||
} else {
|
||||
batchMessage = null;
|
||||
log.trace("State [Read JSON] Sequence Number [{}] not enough readable bytes [{}] for payload [{}]", sequenceNumber, readableBytes, payloadSize);
|
||||
}
|
||||
|
||||
return batchMessage;
|
||||
}
|
||||
|
||||
private String getRemoteHostAddress(final Channel channel) {
|
||||
final String remoteHostAddress;
|
||||
|
||||
final SocketAddress remoteAddress = channel.remoteAddress();
|
||||
if (remoteAddress instanceof InetSocketAddress) {
|
||||
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) remoteAddress;
|
||||
final InetAddress address = remoteSocketAddress.getAddress();
|
||||
remoteHostAddress = address.getHostAddress();
|
||||
} else {
|
||||
remoteHostAddress = remoteAddress.toString();
|
||||
}
|
||||
|
||||
return remoteHostAddress;
|
||||
}
|
||||
|
||||
private void processBatchMessage(final BatchMessage batchMessage, final List<Object> batches) {
|
||||
if (batchMessages.offer(batchMessage)) {
|
||||
resetSequenceVersionPayloadSize();
|
||||
resetFrameTypeVersion();
|
||||
|
||||
if (windowSize.get() == batchMessages.size()) {
|
||||
final Collection<BatchMessage> messages = new ArrayList<>(batchMessages);
|
||||
final Batch batch = new Batch(messages);
|
||||
batches.add(batch);
|
||||
|
||||
resetWindowSize();
|
||||
}
|
||||
} else {
|
||||
final String message = String.format("Received message exceeds Window Size [%d]", windowSize.get());
|
||||
throw new ProtocolException(message);
|
||||
}
|
||||
}
|
||||
|
||||
private void readCompressedBuffer(final ByteBuf compressedBuffer, final ByteBuf inflated, final int compressedSize) {
|
||||
final Inflater inflater = new Inflater();
|
||||
try (
|
||||
final ByteBufOutputStream outputStream = new ByteBufOutputStream(inflated);
|
||||
final InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(outputStream, inflater)
|
||||
) {
|
||||
compressedBuffer.readBytes(inflaterOutputStream, compressedSize);
|
||||
} catch (final IOException e) {
|
||||
final String message = String.format("Read Compressed Payload Size [%d] failed", compressedSize);
|
||||
throw new ProtocolException(message, e);
|
||||
} finally {
|
||||
inflater.end();
|
||||
}
|
||||
}
|
||||
|
||||
private Integer readSequenceNumber(final ByteBuf buffer) {
|
||||
if (sequenceNumberRef.get() == null) {
|
||||
final Integer readSequenceNumber = readUnsignedInteger(buffer);
|
||||
if (readSequenceNumber == null) {
|
||||
log.trace("State [Read JSON] not enough readable bytes for Sequence Number");
|
||||
} else {
|
||||
sequenceNumberRef.set(readSequenceNumber);
|
||||
}
|
||||
}
|
||||
|
||||
return sequenceNumberRef.get();
|
||||
}
|
||||
|
||||
private Integer readPayloadSize(final ByteBuf buffer) {
|
||||
if (payloadSizeRef.get() == null) {
|
||||
final Integer readPayloadSize = readUnsignedInteger(buffer);
|
||||
if (readPayloadSize == null) {
|
||||
log.trace("State [Read JSON] not enough readable bytes for Payload Size");
|
||||
} else {
|
||||
payloadSizeRef.set(readPayloadSize);
|
||||
}
|
||||
}
|
||||
|
||||
return payloadSizeRef.get();
|
||||
}
|
||||
|
||||
private Integer readCompressedSize(final ByteBuf buffer) {
|
||||
if (compressedSizeRef.get() == null) {
|
||||
final Integer readCompressedSize = readUnsignedInteger(buffer);
|
||||
if (readCompressedSize == null) {
|
||||
log.trace("State [Read Compressed] not enough readable bytes for Compressed Size");
|
||||
} else {
|
||||
compressedSizeRef.set(readCompressedSize);
|
||||
}
|
||||
}
|
||||
|
||||
return compressedSizeRef.get();
|
||||
}
|
||||
|
||||
private Integer readUnsignedInteger(final ByteBuf buffer) {
|
||||
final Integer number;
|
||||
|
||||
final int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes >= INT_READABLE_BYTES) {
|
||||
final long unsigned = buffer.readUnsignedInt();
|
||||
number = Math.toIntExact(unsigned);
|
||||
} else {
|
||||
number = null;
|
||||
}
|
||||
|
||||
return number;
|
||||
}
|
||||
|
||||
private FrameType readFrameType(final ByteBuf buffer) {
|
||||
if (frameTypeRef.get() == null) {
|
||||
final int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes >= CODE_READABLE_BYTES) {
|
||||
final byte frameTypeCode = buffer.readByte();
|
||||
final FrameType frameType = FRAME_TYPE_DECODER.readProtocolCode(frameTypeCode);
|
||||
frameTypeRef.set(frameType);
|
||||
} else {
|
||||
log.trace("State [Read Frame Type] not enough readable bytes [{}]", readableBytes);
|
||||
}
|
||||
}
|
||||
|
||||
return frameTypeRef.get();
|
||||
}
|
||||
|
||||
private ProtocolVersion readVersion(final ByteBuf buffer) {
|
||||
if (versionRef.get() == null) {
|
||||
final int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes >= CODE_READABLE_BYTES) {
|
||||
final byte versionCode = buffer.readByte();
|
||||
final ProtocolVersion protocolVersion = VERSION_DECODER.readProtocolCode(versionCode);
|
||||
versionRef.set(protocolVersion);
|
||||
} else {
|
||||
log.trace("State [Read Version] not enough readable bytes [{}]", readableBytes);
|
||||
}
|
||||
}
|
||||
|
||||
return versionRef.get();
|
||||
}
|
||||
|
||||
private void resetSequenceVersionPayloadSize() {
|
||||
sequenceNumberRef.set(null);
|
||||
payloadSizeRef.set(null);
|
||||
}
|
||||
|
||||
private void resetFrameTypeVersion() {
|
||||
frameTypeRef.set(null);
|
||||
versionRef.set(null);
|
||||
}
|
||||
|
||||
private void resetWindowSize() {
|
||||
windowSize.set(INITIAL_WINDOW_SIZE);
|
||||
batchMessages.clear();
|
||||
}
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.handler;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToByteEncoder;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.beats.protocol.FrameType;
|
||||
import org.apache.nifi.processors.beats.protocol.MessageAck;
|
||||
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Beats Message Acknowledgement Encoder writes Protocol Version 2 ACK packets with a specified sequence number
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class MessageAckEncoder extends MessageToByteEncoder<MessageAck> {
|
||||
private final ComponentLog log;
|
||||
|
||||
/**
|
||||
* Message Acknowledgment Encoder with required arguments
|
||||
*
|
||||
* @param log Processor Log
|
||||
*/
|
||||
public MessageAckEncoder(final ComponentLog log) {
|
||||
this.log = Objects.requireNonNull(log, "Component Log required");
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode Message Acknowledgement to the buffer with Protocol Version 2 and ACK Frame Type
|
||||
*
|
||||
* @param context Channel Handler Context
|
||||
* @param messageAck Message Acknowledgement containing Sequence Number
|
||||
* @param buffer Byte Buffer
|
||||
*/
|
||||
@Override
|
||||
protected void encode(final ChannelHandlerContext context, final MessageAck messageAck, final ByteBuf buffer) {
|
||||
buffer.writeByte(ProtocolVersion.VERSION_2.getCode());
|
||||
buffer.writeByte(FrameType.ACK.getCode());
|
||||
|
||||
final int sequenceNumber = messageAck.getSequenceNumber();
|
||||
buffer.writeInt(sequenceNumber);
|
||||
|
||||
final Channel channel = context.channel();
|
||||
log.debug("Encoded Message Ack Sequence Number [{}] Local [{}] Remote [{}]", sequenceNumber, channel.localAddress(), channel.remoteAddress());
|
||||
}
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Batch of Beats Messages
|
||||
*/
|
||||
public class Batch {
|
||||
private final Collection<BatchMessage> messages;
|
||||
|
||||
public Batch(final Collection<BatchMessage> messages) {
|
||||
this.messages = Objects.requireNonNull(messages, "Message required");
|
||||
}
|
||||
|
||||
public Collection<BatchMessage> getMessages() {
|
||||
return messages;
|
||||
}
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
import org.apache.nifi.event.transport.message.ByteArrayMessage;
|
||||
|
||||
/**
|
||||
* Beats Batch Message containing JSON payload and sequence number
|
||||
*/
|
||||
public class BatchMessage extends ByteArrayMessage {
|
||||
|
||||
private final int sequenceNumber;
|
||||
|
||||
public BatchMessage(final String sender, final byte[] payload, final int sequenceNumber) {
|
||||
super(payload, sender);
|
||||
this.sequenceNumber = sequenceNumber;
|
||||
}
|
||||
|
||||
public int getSequenceNumber() {
|
||||
return sequenceNumber;
|
||||
}
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
/**
|
||||
* Beats Protocol Frame Type
|
||||
*/
|
||||
public enum FrameType implements ProtocolCode {
|
||||
ACK('A'),
|
||||
|
||||
COMPRESSED('C'),
|
||||
|
||||
DATA('D'),
|
||||
|
||||
JSON('J'),
|
||||
|
||||
WINDOW_SIZE('W');
|
||||
|
||||
private final int code;
|
||||
|
||||
FrameType(final char code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Beats Frame Type Decoder
|
||||
*/
|
||||
public class FrameTypeDecoder implements ProtocolCodeDecoder<FrameType> {
|
||||
|
||||
@Override
|
||||
public FrameType readProtocolCode(final byte code) {
|
||||
final Optional<FrameType> frameTypeFound = Arrays.stream(FrameType.values()).filter(
|
||||
frameType -> frameType.getCode() == code
|
||||
).findFirst();
|
||||
|
||||
return frameTypeFound.orElseThrow(() -> {
|
||||
final String message = String.format("Frame Type Code [%d] not supported", code);
|
||||
return new ProtocolException(message);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
/**
|
||||
* Beats Message Acknowledgement
|
||||
*/
|
||||
public class MessageAck {
|
||||
private final int sequenceNumber;
|
||||
|
||||
public MessageAck(final int sequenceNumber) {
|
||||
this.sequenceNumber = sequenceNumber;
|
||||
}
|
||||
|
||||
public int getSequenceNumber() {
|
||||
return sequenceNumber;
|
||||
}
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
/**
|
||||
* Beats Protocol Code interface abstraction
|
||||
*/
|
||||
public interface ProtocolCode {
|
||||
/**
|
||||
* Get Protocol Code as transmitted over a socket connection
|
||||
*
|
||||
* @return Protocol Code
|
||||
*/
|
||||
int getCode();
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
/**
|
||||
* Decoder for Protocol Code byte values
|
||||
*
|
||||
* @param <T> Protocol Code Type
|
||||
*/
|
||||
public interface ProtocolCodeDecoder<T extends ProtocolCode> {
|
||||
/**
|
||||
* Read Protocol Code
|
||||
*
|
||||
* @param code Code byte value
|
||||
* @return Protocol Code
|
||||
*/
|
||||
T readProtocolCode(byte code);
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
/**
|
||||
* Beats Protocol Exception
|
||||
*/
|
||||
public class ProtocolException extends RuntimeException {
|
||||
/**
|
||||
* Protocol Exception constructor with message containing protocol failure details
|
||||
*
|
||||
* @param message Protocol failure details
|
||||
*/
|
||||
public ProtocolException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol Exception constructor with message and cause of failure details
|
||||
*
|
||||
* @param message Protocol failure details
|
||||
* @param cause Cause of failure
|
||||
*/
|
||||
public ProtocolException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
/**
|
||||
* Beats Protocol Version
|
||||
*/
|
||||
public enum ProtocolVersion implements ProtocolCode {
|
||||
VERSION_1('1'),
|
||||
|
||||
VERSION_2('2');
|
||||
|
||||
private final int code;
|
||||
|
||||
ProtocolVersion(final char code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.protocol;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Beats Protocol Version Decoder
|
||||
*/
|
||||
public class ProtocolVersionDecoder implements ProtocolCodeDecoder<ProtocolVersion> {
|
||||
|
||||
@Override
|
||||
public ProtocolVersion readProtocolCode(final byte code) {
|
||||
final Optional<ProtocolVersion> protocolVersionFound = Arrays.stream(ProtocolVersion.values()).filter(
|
||||
protocolVersion -> protocolVersion.getCode() == code
|
||||
).findFirst();
|
||||
|
||||
return protocolVersionFound.orElseThrow(() -> {
|
||||
final String message = String.format("Version Code [%d] not supported", code);
|
||||
return new ProtocolException(message);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats.server;
|
||||
|
||||
import org.apache.nifi.event.transport.configuration.TransportProtocol;
|
||||
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
|
||||
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.beats.handler.BatchChannelInboundHandler;
|
||||
import org.apache.nifi.processors.beats.handler.BatchDecoder;
|
||||
import org.apache.nifi.processors.beats.handler.MessageAckEncoder;
|
||||
import org.apache.nifi.processors.beats.protocol.BatchMessage;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
/**
|
||||
* Beats Message Protocol extends of Netty Event Server Factory
|
||||
*/
|
||||
public class BeatsMessageServerFactory extends NettyEventServerFactory {
|
||||
/**
|
||||
* Beats Message Server Factory constructor with standard configuration arguments
|
||||
*
|
||||
* @param log Component Log
|
||||
* @param address Server Address
|
||||
* @param port Server Port Number
|
||||
* @param events Blocking Queue for events received
|
||||
*/
|
||||
public BeatsMessageServerFactory(final ComponentLog log,
|
||||
final InetAddress address,
|
||||
final int port,
|
||||
final BlockingQueue<BatchMessage> events) {
|
||||
super(address, port, TransportProtocol.TCP);
|
||||
final MessageAckEncoder messageAckEncoder = new MessageAckEncoder(log);
|
||||
final BatchChannelInboundHandler batchChannelInboundHandler = new BatchChannelInboundHandler(log, events);
|
||||
final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
|
||||
|
||||
setHandlerSupplier(() -> Arrays.asList(
|
||||
messageAckEncoder,
|
||||
new BatchDecoder(log),
|
||||
batchChannelInboundHandler,
|
||||
logExceptionChannelHandler
|
||||
));
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.beats.ListenBeats
|
|
@ -1,236 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.beats;
|
||||
|
||||
import org.apache.nifi.processor.util.listen.ListenerProperties;
|
||||
import org.apache.nifi.processors.beats.protocol.FrameType;
|
||||
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class ListenBeatsTest {
|
||||
|
||||
private static final String LOCALHOST = "127.0.0.1";
|
||||
|
||||
private static final String LOCALHOST_TRANSIT_URI = "beats://127.0.0.1:%d";
|
||||
|
||||
private static final int ACK_PACKET_LENGTH = 6;
|
||||
|
||||
private static final int FIRST_SEQUENCE_NUMBER = 1;
|
||||
|
||||
private static final int INTEGER_BUFFER_SIZE = 4;
|
||||
|
||||
private static final String JSON_PAYLOAD = "{\"@timestamp\":\"2022-10-31T12:30:45.678Z\",\"message\":\"Processing Started\"}";
|
||||
|
||||
private static final int WINDOWED_MESSAGES = 50;
|
||||
|
||||
TestRunner runner;
|
||||
|
||||
@BeforeEach
|
||||
void createRunner() {
|
||||
runner = TestRunners.newTestRunner(ListenBeats.class);
|
||||
}
|
||||
|
||||
@Timeout(10)
|
||||
@Test
|
||||
void testRunSingleJsonMessage() throws Exception {
|
||||
final int port = startServer();
|
||||
|
||||
try (final Socket socket = new Socket(LOCALHOST, port);
|
||||
final InputStream inputStream = socket.getInputStream();
|
||||
final OutputStream outputStream = socket.getOutputStream()) {
|
||||
|
||||
sendMessage(outputStream, FIRST_SEQUENCE_NUMBER);
|
||||
assertAckPacketMatched(inputStream, FIRST_SEQUENCE_NUMBER);
|
||||
}
|
||||
|
||||
assertFlowFilesSuccess(1);
|
||||
assertReceiveEventFound(port);
|
||||
}
|
||||
|
||||
@Timeout(10)
|
||||
@Test
|
||||
void testRunWindowSizeJsonMessages() throws Exception {
|
||||
final int port = startServer();
|
||||
|
||||
try (final Socket socket = new Socket(LOCALHOST, port);
|
||||
final InputStream inputStream = socket.getInputStream();
|
||||
final OutputStream outputStream = socket.getOutputStream()) {
|
||||
|
||||
sendWindowSize(outputStream);
|
||||
|
||||
for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= WINDOWED_MESSAGES; sequenceNumber++) {
|
||||
sendMessage(outputStream, sequenceNumber);
|
||||
}
|
||||
|
||||
assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
|
||||
}
|
||||
|
||||
assertFlowFilesSuccess(WINDOWED_MESSAGES);
|
||||
assertReceiveEventFound(port);
|
||||
}
|
||||
|
||||
@Timeout(10)
|
||||
@Test
|
||||
void testRunWindowSizeCompressedJsonMessages() throws Exception {
|
||||
final int port = startServer();
|
||||
|
||||
try (final Socket socket = new Socket(LOCALHOST, port);
|
||||
final InputStream inputStream = socket.getInputStream();
|
||||
final OutputStream outputStream = socket.getOutputStream()) {
|
||||
|
||||
sendWindowSize(outputStream);
|
||||
|
||||
final ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream();
|
||||
final DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(compressedOutputStream);
|
||||
|
||||
for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= WINDOWED_MESSAGES; sequenceNumber++) {
|
||||
sendMessage(deflaterOutputStream, sequenceNumber);
|
||||
}
|
||||
|
||||
deflaterOutputStream.close();
|
||||
final byte[] compressed = compressedOutputStream.toByteArray();
|
||||
sendCompressed(outputStream, compressed);
|
||||
|
||||
assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
|
||||
}
|
||||
|
||||
assertFlowFilesSuccess(WINDOWED_MESSAGES);
|
||||
assertReceiveEventFound(port);
|
||||
}
|
||||
|
||||
private int startServer() {
|
||||
runner.setProperty(ListenerProperties.PORT, "0");
|
||||
runner.run(1, false, true);
|
||||
|
||||
final int port = ((ListenBeats) runner.getProcessor()).getListeningPort();
|
||||
|
||||
return port;
|
||||
}
|
||||
|
||||
private void assertReceiveEventFound(final int port) {
|
||||
final Optional<ProvenanceEventRecord> receiveRecord = runner.getProvenanceEvents().stream().filter(record ->
|
||||
ProvenanceEventType.RECEIVE == record.getEventType()
|
||||
).findFirst();
|
||||
|
||||
assertTrue(receiveRecord.isPresent());
|
||||
final ProvenanceEventRecord record = receiveRecord.get();
|
||||
|
||||
final String expectedTransitUri = String.format(LOCALHOST_TRANSIT_URI, port);
|
||||
assertEquals(expectedTransitUri, record.getTransitUri());
|
||||
}
|
||||
|
||||
private void assertFlowFilesSuccess(final int expectedFlowFiles) {
|
||||
runner.run(expectedFlowFiles, true, false);
|
||||
runner.assertTransferCount(ListenBeats.REL_SUCCESS, expectedFlowFiles);
|
||||
|
||||
final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListenBeats.REL_SUCCESS).iterator();
|
||||
int i = 1;
|
||||
while (flowFiles.hasNext()) {
|
||||
final MockFlowFile flowFile = flowFiles.next();
|
||||
final String content = flowFile.getContent();
|
||||
assertEquals(JSON_PAYLOAD, content, String.format("FlowFile Content [%d] not matched", i));
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
private void sendWindowSize(final OutputStream outputStream) throws IOException {
|
||||
outputStream.write(ProtocolVersion.VERSION_2.getCode());
|
||||
|
||||
outputStream.write(FrameType.WINDOW_SIZE.getCode());
|
||||
|
||||
final byte[] windowSize = getUnsignedInteger(WINDOWED_MESSAGES);
|
||||
outputStream.write(windowSize);
|
||||
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
private void sendMessage(final OutputStream outputStream, final int sequenceNumber) throws IOException {
|
||||
outputStream.write(ProtocolVersion.VERSION_2.getCode());
|
||||
|
||||
outputStream.write(FrameType.JSON.getCode());
|
||||
|
||||
final byte[] sequenceNumberEncoded = getUnsignedInteger(sequenceNumber);
|
||||
outputStream.write(sequenceNumberEncoded);
|
||||
|
||||
final int payloadLength = JSON_PAYLOAD.length();
|
||||
|
||||
final byte[] payloadSize = getUnsignedInteger(payloadLength);
|
||||
outputStream.write(payloadSize);
|
||||
|
||||
outputStream.write(JSON_PAYLOAD.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
private void sendCompressed(final OutputStream outputStream, final byte[] compressed) throws IOException {
|
||||
outputStream.write(ProtocolVersion.VERSION_2.getCode());
|
||||
|
||||
outputStream.write(FrameType.COMPRESSED.getCode());
|
||||
|
||||
final int payloadLength = compressed.length;
|
||||
|
||||
final byte[] payloadSize = getUnsignedInteger(payloadLength);
|
||||
outputStream.write(payloadSize);
|
||||
|
||||
outputStream.write(compressed);
|
||||
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
private void assertAckPacketMatched(final InputStream inputStream, final int expectedSequenceNumber) throws IOException {
|
||||
final byte[] ackPacket = new byte[ACK_PACKET_LENGTH];
|
||||
final int bytesRead = inputStream.read(ackPacket);
|
||||
|
||||
assertEquals(ACK_PACKET_LENGTH, bytesRead);
|
||||
|
||||
final ByteBuffer ackPacketBuffer = ByteBuffer.wrap(ackPacket);
|
||||
|
||||
final byte version = ackPacketBuffer.get();
|
||||
assertEquals(ProtocolVersion.VERSION_2.getCode(), version);
|
||||
|
||||
final byte frameType = ackPacketBuffer.get();
|
||||
assertEquals(FrameType.ACK.getCode(), frameType);
|
||||
|
||||
final int sequenceNumber = ackPacketBuffer.getInt();
|
||||
assertEquals(expectedSequenceNumber, sequenceNumber);
|
||||
}
|
||||
|
||||
private byte[] getUnsignedInteger(final int number) {
|
||||
return ByteBuffer.allocate(INTEGER_BUFFER_SIZE).putInt(number).array();
|
||||
}
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-bom</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<relativePath>../nifi-standard-services-api-bom</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-beats-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
<module>nifi-beats-processors</module>
|
||||
<module>nifi-beats-nar</module>
|
||||
</modules>
|
||||
</project>
|
|
@ -54,7 +54,6 @@
|
|||
<module>nifi-amqp-bundle</module>
|
||||
<module>nifi-splunk-bundle</module>
|
||||
<module>nifi-jms-bundle</module>
|
||||
<module>nifi-beats-bundle</module>
|
||||
<module>nifi-site-to-site-reporting-bundle</module>
|
||||
<module>nifi-mqtt-bundle</module>
|
||||
<module>nifi-evtx-bundle</module>
|
||||
|
|
Loading…
Reference in New Issue