NIFI-856 - Implements ListenLumberjack Processor

This closes #290.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Andre F de Miranda 2016-03-19 01:41:35 +11:00 committed by Aldrin Piri
parent 54c5c28cbb
commit 5a8979150c
30 changed files with 2329 additions and 3 deletions

View File

@ -282,6 +282,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-elasticsearch-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-nar</artifactId>

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lumberjack-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-processors</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,5 @@
nifi-lumberjack-nar
Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lumberjack-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-socket-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
import org.apache.nifi.processors.lumberjack.handler.LumberjackSocketChannelHandlerFactory;
import org.apache.nifi.processors.lumberjack.response.LumberjackChannelResponse;
import org.apache.nifi.processors.lumberjack.response.LumberjackResponse;
import org.apache.nifi.ssl.SSLContextService;
import com.google.gson.Gson;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "lumberjack", "tcp", "logs"})
@CapabilityDescription("Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " +
"acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " +
"portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " +
"output of this processor can be sent to a ParseSyslog processor for further processing.")
@WritesAttributes({
@WritesAttribute(attribute = "lumberjack.sender", description = "The sending host of the messages."),
@WritesAttribute(attribute = "lumberjack.port", description = "The sending port the messages were received over."),
@WritesAttribute(attribute = "lumberjack.sequencenumber", description = "The sequence number of the message. Only included if <Batch Size> is 1."),
@WritesAttribute(attribute = "lumberjack.*", description = "The keys and respective values as sent by the lumberjack producer. Only included if <Batch Size> is 1."),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain")
})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenLumberjack extends AbstractListenEventBatchingProcessor<LumberjackEvent> {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection. Note that as Lumberjack client requires" +
"two-way SSL authentication, the controller MUST have a truststore and a keystore to work" +
"properly.")
.required(true)
.identifiesControllerService(SSLContextService.class)
.build();
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
SSL_CONTEXT_SERVICE
);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isTrustStoreConfigured() == false) {
results.add(new ValidationResult.Builder()
.explanation("The context service must have a truststore configured for the lumberjack forwarder client to work correctly")
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
}
return results;
}
private volatile LumberjackEncoder lumberjackEncoder;
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
// wanted to ensure charset was already populated here
lumberjackEncoder = new LumberjackEncoder();
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<LumberjackEvent> events) throws IOException {
final EventFactory<LumberjackEvent> eventFactory = new LumberjackEventFactory();
final ChannelHandlerFactory<LumberjackEvent, AsyncChannelDispatcher> handlerFactory = new LumberjackSocketChannelHandlerFactory<>();
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
getLogger(), maxConnections, sslContext, charSet);
}
@Override
protected String getBatchKey(LumberjackEvent event) {
return event.getSender();
}
protected void respond(final LumberjackEvent event, final LumberjackResponse lumberjackResponse) {
final ChannelResponse response = new LumberjackChannelResponse(lumberjackEncoder, lumberjackResponse);
final ChannelResponder responder = event.getResponder();
responder.addResponse(response);
try {
responder.respond();
} catch (IOException e) {
getLogger().error("Error sending response for transaction {} due to {}",
new Object[]{event.getSeqNumber(), e.getMessage()}, e);
}
}
protected void postProcess(final ProcessContext context, final ProcessSession session, final List<LumberjackEvent> events) {
// first commit the session so we guarantee we have all the events successfully
// written to FlowFiles and transferred to the success relationship
session.commit();
// respond to each event to acknowledge successful receipt
for (final LumberjackEvent event : events) {
respond(event, LumberjackResponse.ok(event.getSeqNumber()));
}
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("lumberjack").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
@Override
protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
final List<LumberjackEvent> events = batch.getEvents();
// the sender and command will be the same for all events based on the batch key
final String sender = events.get(0).getSender();
final int numAttributes = events.size() == 1 ? 5 : 4;
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(LumberjackAttributes.SENDER.key(), sender);
attributes.put(LumberjackAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
// if there was only one event then we can pass on the transaction
// NOTE: we could pass on all the transaction ids joined together
if (events.size() == 1) {
attributes.put(LumberjackAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber()));
// Convert the serialized fields from JSON
String serialFields = String.valueOf(events.get(0).getFields());
Gson jsonObject = new Gson();
Map<String, String> fields = jsonObject.fromJson(serialFields, Map.class);
for (Map.Entry<String, String> entry : fields.entrySet()) {
attributes.put(LumberjackAttributes.FIELDS.key().concat(".").concat(entry.getKey()), entry.getValue());
}
}
return attributes;
}
public enum LumberjackAttributes implements FlowFileAttributeKey {
SENDER("lumberjack.sender"),
PORT("lumberjack.port"),
SEQNUMBER("lumberjack.sequencenumber"),
FIELDS("lumberjack.fields");
private final String key;
LumberjackAttributes(String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.nio.channels.SocketChannel;
/**
* A Lumberjack event which adds the transaction number and command to the StandardEvent.
*/
public class LumberjackEvent extends StandardEvent<SocketChannel> {
private final long seqNumber;
private final String fields;
public LumberjackEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final long seqNumber, String fields) {
super(sender, data, responder);
this.seqNumber = seqNumber;
this.fields = fields;
}
public long getSeqNumber() {
return seqNumber;
}
public String getFields() {
return fields;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import java.util.Map;
/**
* An EventFactory implementation to create LumberjackEvents.
*/
public class LumberjackEventFactory implements EventFactory<LumberjackEvent> {
@Override
public LumberjackEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
final String sender = metadata.get(EventFactory.SENDER_KEY);
final long seqNumber = Long.valueOf(metadata.get(LumberjackMetadata.SEQNUMBER_KEY));
final String fields = metadata.get(LumberjackMetadata.FIELDS_KEY);
return new LumberjackEvent(sender, data, responder, seqNumber, fields);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
/**
* Metadata keys for Lumberjack.
*/
public interface LumberjackMetadata {
String SEQNUMBER_KEY = "lumberjack.sequencenumber";
String FIELDS_KEY = "lumberjack.fields";
}

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.InflaterInputStream;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decodes a Lumberjack frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
*/
public class LumberjackDecoder {
static final Logger logger = LoggerFactory.getLogger(LumberjackDecoder.class);
private LumberjackFrame.Builder frameBuilder;
private LumberjackState currState = LumberjackState.VERSION;
private byte decodedFrameType;
private byte[] decompressedData;
private final Charset charset;
private final ByteArrayOutputStream currBytes;
private long windowSize;
public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
/**
* @param charset the charset to decode bytes from the Lumberjack frame
*/
public LumberjackDecoder(final Charset charset) {
this(charset, new ByteArrayOutputStream(4096));
}
/**
* @param charset the charset to decode bytes from the Lumberjack frame
* @param buffer a buffer to use while processing the bytes
*/
public LumberjackDecoder(final Charset charset, final ByteArrayOutputStream buffer) {
this.charset = charset;
this.currBytes = buffer;
this.frameBuilder = new LumberjackFrame.Builder();
this.decodedFrameType = 0x00;
}
/**
* Resets this decoder back to its initial state.
*/
public void reset() {
frameBuilder = new LumberjackFrame.Builder();
currState = LumberjackState.VERSION;
decodedFrameType = 0x00;
currBytes.reset();
}
/**
* Process the next byte from the channel, updating the builder and state accordingly.
*
* @param currByte the next byte to process
* @preturn true if a frame is ready to be retrieved, false otherwise
*/
public boolean process(final byte currByte) throws LumberjackFrameException {
try {
switch (currState) {
case VERSION:
processVERSION(currByte);
break;
case FRAMETYPE:
processFRAMETYPE(currByte);
break;
case PAYLOAD:
processPAYLOAD(currByte);
if (frameBuilder.frameType == FRAME_WINDOWSIZE && currState == LumberjackState.COMPLETE) {
return true;
} else if (frameBuilder.frameType == FRAME_COMPRESSED && currState == LumberjackState.COMPLETE) {
return true;
} else {
break;
}
case COMPLETE:
return true;
default:
break;
}
return false;
} catch (Exception e) {
throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e);
}
}
/**
* Returns the decoded frame and resets the decoder for the next frame.
* This method should be called after checking isComplete().
*
* @return the LumberjackFrame that was decoded
*/
public List<LumberjackFrame> getFrames() throws LumberjackFrameException {
List<LumberjackFrame> frames = new LinkedList<>();
if (currState != LumberjackState.COMPLETE) {
throw new LumberjackFrameException("Must be at the trailer of a frame");
}
try {
if (currState == LumberjackState.COMPLETE && frameBuilder.frameType == FRAME_COMPRESSED) {
logger.debug("Frame is compressed, will iterate to decode", new Object[]{});
// LumberjackDecoder decompressedDecoder = new LumberjackDecoder();
// Zero currBytes, currState and frameBuilder prior to iteration over
// decompressed bytes
currBytes.reset();
frameBuilder.reset();
currState = LumberjackState.VERSION;
// Run over decompressed data.
frames = processDECOMPRESSED(decompressedData);
} else {
final LumberjackFrame frame = frameBuilder.build();
currBytes.reset();
frameBuilder.reset();
currState = LumberjackState.VERSION;
frames.add(frame);
}
return frames;
} catch (Exception e) {
throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e);
}
}
private List<LumberjackFrame> processDECOMPRESSED(byte[] decompressedData) {
List<LumberjackFrame> frames = new LinkedList<>();
LumberjackFrame.Builder internalFrameBuilder = new LumberjackFrame.Builder();
ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
// Lumberjack has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames.
// inside a compressed input.
// Or astated in the documentation:
//
// "As an example, you could have 3 data frames compressed into a single
// 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
//
// Therefore, instead of calling process method again, just iterate over each of
// the frames and split them so they can be processed by LumberjackFrameHandler
while (currentData.hasRemaining()) {
int payloadLength = 0;
internalFrameBuilder.version = currentData.get();
internalFrameBuilder.frameType = currentData.get();
internalFrameBuilder.seqNumber = currentData.getInt() & 0x00000000ffffffffL;
currentData.mark();
// Set the payloadLength to negative to avoid doing math
// around valueLength and valueLength
payloadLength = payloadLength - currentData.position();
long pairCount = currentData.getInt() & 0x00000000ffffffffL;
for (int i = 0; i < pairCount; i++) {
long keyLength = currentData.getInt() & 0x00000000ffffffffL;
currentData.position(currentData.position() + (int) keyLength);
long valueLength = currentData.getInt() & 0x00000000ffffffffL;
currentData.position(currentData.position() + (int) valueLength);
}
// Infer the length of the payload from position...
payloadLength = payloadLength + currentData.position();
// Reset to mark (i.e. skip frame headers) prior to getting the data
currentData.reset();
// get the data, shift mark and compact so next iteration can
// read rest of buffer.
byte[] bytes = new byte[payloadLength];
currentData.get(bytes, 0, payloadLength);
currentData.mark();
// Add payload to frame
internalFrameBuilder.payload(bytes);
// data frame is created
LumberjackFrame frame = internalFrameBuilder.build();
frames.add(frame);
internalFrameBuilder.reset();
}
return frames;
}
private void processVERSION(final byte b) {
byte version = b;
frameBuilder.version(version);
logger.debug("Version number is {}", new Object[]{version});
currBytes.write(b);
currState = LumberjackState.FRAMETYPE;
}
private void processFRAMETYPE(final byte b) {
decodedFrameType = b;
frameBuilder.frameType(decodedFrameType);
logger.debug("Frame type is {}", new Object[]{decodedFrameType});
currBytes.write(b);
currState = LumberjackState.PAYLOAD;
}
private void processPAYLOAD(final byte b) {
currBytes.write(b);
switch (decodedFrameType) {
case FRAME_WINDOWSIZE: //'W'
if (currBytes.size() < 6) {
logger.trace("Lumberjack currBytes contents are {}", currBytes.toString());
break;
} else if (currBytes.size() == 6) {
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize});
// Sets payload to empty as frame contains no data
frameBuilder.payload(new byte[]{});
currBytes.reset();
currState = LumberjackState.COMPLETE;
windowSize = frameBuilder.dataSize;
break;
} else {
break;
}
case FRAME_COMPRESSED: //'C'
if (currBytes.size() < 6) {
logger.trace("Lumberjack currBytes contents are {}", currBytes.toString());
break;
} else if (currBytes.size() >= 6) {
frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
if (currBytes.size() - 6 == frameBuilder.dataSize) {
try {
byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf));
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
in.close();
out.close();
decompressedData = out.toByteArray();
// buf is no longer needed
buf = null;
logger.debug("Finished decompressing data");
// Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them
// as type 'D' frames
frameBuilder.dataSize(decompressedData.length);
currState = LumberjackState.COMPLETE;
} catch (IOException e) {
throw new LumberjackFrameException("Error decompressing frame: " + e.getMessage(), e);
}
}
break;
// If currentByte.size is not lower than six and also not equal or great than 6...
} else {
break;
}
}
}
private void processCOMPLETE() {
currBytes.reset();
frameBuilder.reset();
currState = LumberjackState.VERSION;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Encodes a LumberjackFrame into raw bytes using the given charset.
*/
public class LumberjackEncoder {
public byte[] encode(final LumberjackFrame frame) {
final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
// Writes the version
buffer.write(frame.getVersion());
// Writes the frameType
buffer.write(frame.getFrameType());
// Writes the sequence number
try {
buffer.write(frame.getPayload());
} catch (IOException e) {
throw new LumberjackFrameException("Error decoding Lumberjack frame: " + e.getMessage(), e);
}
return buffer.toByteArray();
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
/**
* A Lumberjack frame received from a channel.
*/
public class LumberjackFrame {
public static final byte DELIMITER = 10;
private final byte version;
private final byte frameType;
private final byte[] payload;
private final long dataSize;
private final long seqNumber;
private LumberjackFrame(final Builder builder) {
this.version = builder.version;
this.frameType = builder.frameType;
this.payload = builder.payload;
this.dataSize = builder.dataSize;
this.seqNumber = builder.seqNumber;
if (version < 2 || payload.length < 0 ) {
throw new LumberjackFrameException("Invalid Frame");
}
}
public long getSeqNumber() {
return seqNumber;
}
public byte getVersion() {
return version;
}
public byte getFrameType() {
return frameType;
}
public byte [] getPayload() {
return payload;
}
/**
* Builder for a LumberjackFrame.
*/
public static class Builder {
byte version;
byte frameType;
byte [] payload;
long dataSize;
long seqNumber;
public Builder() {
reset();
}
public void reset() {
version = -1;
seqNumber = -1;
frameType = -1;
payload = null;
}
public Builder version(final byte version) {
this.version = version;
return this;
}
public Builder seqNumber(final long seqNumber) {
this.seqNumber = seqNumber;
return this;
}
public Builder frameType(final byte frameType) {
this.frameType = frameType;
return this;
}
public Builder dataSize(final long dataSize) {
this.dataSize = dataSize;
return this;
}
public Builder payload(final byte [] payload) {
this.payload = payload;
return this;
}
public LumberjackFrame build() {
return new LumberjackFrame(this);
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
/**
* Represents an error encountered when decoding Lumberjack frames.
*/
public class LumberjackFrameException extends RuntimeException {
public LumberjackFrameException(String message) {
super(message);
}
public LumberjackFrameException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
/**
* The parts of a Lumberjack frame.
*/
public enum LumberjackState {
VERSION,
FRAMETYPE,
PAYLOAD,
COMPLETE
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import com.google.gson.Gson;
/**
* Encapsulates the logic to handle a LumberjackFrame once it has been read from the channel.
*/
public class LumberjackFrameHandler<E extends Event<SocketChannel>> {
private final Charset charset;
private final EventFactory<E> eventFactory;
private final EventQueue<E> events;
private final SelectionKey key;
private final AsyncChannelDispatcher dispatcher;
private final LumberjackEncoder encoder;
private final ComponentLog logger;
public LumberjackFrameHandler(final SelectionKey selectionKey,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final AsyncChannelDispatcher dispatcher,
final ComponentLog logger) {
this.key = selectionKey;
this.charset = charset;
this.eventFactory = eventFactory;
this.dispatcher = dispatcher;
this.logger = logger;
this.events = new EventQueue<>(events, logger);
this.encoder = new LumberjackEncoder();
}
public void handle(final LumberjackFrame frame, final ChannelResponder<SocketChannel> responder, final String sender)
throws IOException, InterruptedException {
final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
String line = "";
/* If frameType is a data Frame, Handle the Lumberjack data payload, iterating over it and extracting
keys and values into metadata.
All keys are inserted into metadata with the exception of line that gets added into the body of the event
*/
if (frame.getFrameType() == 0x44) {
ByteBuffer currentData = ByteBuffer.wrap(frame.getPayload());
long pairCount = currentData.getInt() & 0x00000000ffffffffL;
Map<String,String> fields = new HashMap<>();
for (int i = 0; i < pairCount; i++) {
long keyLength = currentData.getInt() & 0x00000000ffffffffL;
byte[] bytes = new byte[(int) keyLength];
currentData.get(bytes);
String key = new String(bytes);
long valueLength = currentData.getInt() & 0x00000000ffffffffL;
bytes = new byte[(int) valueLength];
currentData.get(bytes);
String value = new String(bytes);
if (key.equals("line")) {
line = value;
} else {
fields.put(key, value);
}
}
// Serialize the fields into a String to push it via metdate
Gson serialFields = new Gson();
metadata.put("lumberjack.fields", serialFields.toJson(fields).toString());
// queue the raw event blocking until space is available, reset the buffer
final E event = eventFactory.create(line.getBytes(), metadata, responder);
events.offer(event);
} else if (frame.getFrameType() == 0x4A ) {
logger.error("Data type was JSON. JSON payload aren't yet supported, pending the documentation of Lumberjack protocol v2");
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackDecoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
/**
* A Lumberjack implementation of SSLSocketChannelHandler.
*/
public class LumberjackSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
private LumberjackDecoder decoder;
private LumberjackFrameHandler<E> frameHandler;
public LumberjackSSLSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new LumberjackDecoder(charset);
this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the Lumberjack command
for (int i = 0; i < bytesRead; i++) {
byte currByte = buffer[i];
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<LumberjackFrame> frames = decoder.getFrames();
// A list of events has been generated
for (LumberjackFrame frame : frames) {
logger.debug("Received Lumberjack frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOWS type frames as they contain no payload.
if (frame.getFrameType() != 0x57 ) {
final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
}
}
logger.debug("Done processing buffer");
} catch (final LumberjackFrameException rfe) {
logger.error("Error reading Lumberjack frames due to {}", new Object[] {rfe.getMessage()} , rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackDecoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException;
/**
* Extends the StandardSocketChannelHandler to decode bytes into Lumberjack frames.
*/
public class LumberjackSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
private LumberjackDecoder decoder;
private LumberjackFrameHandler<E> frameHandler;
public LumberjackSocketChannelHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
super(key, dispatcher, charset, eventFactory, events, logger);
this.decoder = new LumberjackDecoder(charset);
this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Override
protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer)
throws InterruptedException, IOException {
// get total bytes in buffer
final int total = socketBuffer.remaining();
final InetAddress sender = socketChannel.socket().getInetAddress();
try {
// go through the buffer parsing the Lumberjack command
for (int i = 0; i < total; i++) {
byte currByte = socketBuffer.get();
// if we found the end of a frame, handle the frame and mark the buffer
if (decoder.process(currByte)) {
final List<LumberjackFrame> frames = decoder.getFrames();
for (LumberjackFrame frame : frames) {
//TODO: Clean this
logger.debug("Received Lumberjack frame with transaction {} and command {}",
new Object[]{frame.getSeqNumber(), frame.getSeqNumber()});
// Ignore the WINDOWS type frames as they contain no payload.
if (frame.getFrameType() != 0x57) {
final SocketChannelResponder responder = new SocketChannelResponder(socketChannel);
frameHandler.handle(frame, responder, sender.toString());
}
}
socketBuffer.mark();
}
}
logger.debug("Done processing buffer");
} catch (final LumberjackFrameException rfe) {
logger.error("Error reading Lumberjack frames due to {}", new Object[] {rfe.getMessage()}, rfe);
// if an invalid frame or bad data was sent then the decoder will be left in a
// corrupted state, so lets close the connection and cause the client to re-establish
dispatcher.completeConnection(key);
}
}
// not used for anything in Lumberjack since the decoder encapsulates the delimiter
@Override
public byte getDelimiter() {
return LumberjackFrame.DELIMITER;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
/**
* Default factory for creating Lumberjack socket channel handlers.
*/
public class LumberjackSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new LumberjackSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
@Override
public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
final AsyncChannelDispatcher dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
return new LumberjackSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.response;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
/**
* Creates a LumberjackFrame for the provided response and returns the encoded frame.
*/
public class LumberjackChannelResponse implements ChannelResponse {
private final LumberjackEncoder encoder;
private final LumberjackResponse response;
public LumberjackChannelResponse(final LumberjackEncoder encoder, final LumberjackResponse response) {
this.encoder = encoder;
this.response = response;
}
@Override
public byte[] toByteArray() {
final LumberjackFrame frame = response.toFrame();
return encoder.encode(frame);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.response;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import java.nio.ByteBuffer;
/**
'ack' frame type
SENT FROM READER ONLY
frame type value: ASCII 'A' aka byte value 0x41
Payload:
32bit unsigned sequence number.
*/
public class LumberjackResponse {
private final long seqNumber;
final private byte version = 0x31, frameType = 0x41;
public LumberjackResponse(final long seqNumber) {
this.seqNumber = seqNumber;
}
/**
* Creates a LumberjackFrame where the data portion will contain this response.
*
*
* @return a LumberjackFrame for for this response
*/
public LumberjackFrame toFrame() {
return new LumberjackFrame.Builder()
.version(version)
.frameType(frameType)
.payload(ByteBuffer.allocate(8).putLong(seqNumber).array())
.build();
}
public static LumberjackResponse ok(final long seqNumber) {
return new LumberjackResponse(seqNumber);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.lumberjack.ListenLumberjack

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TestLumberjackEventFactory {
@Test
public void testCreateLumberJackEvent() {
final String sender = "testsender1";
final byte[] data = "this is a test line".getBytes();
final long seqNumber = 1;
final String fields = "{\"file\":\"test\"}";
final Map<String,String> metadata = new HashMap<>();
metadata.put(EventFactory.SENDER_KEY, sender);
metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber));
metadata.put(LumberjackMetadata.FIELDS_KEY, String.valueOf(fields));
final ChannelResponder responder = new SocketChannelResponder(null);
final EventFactory<LumberjackEvent> factory = new LumberjackEventFactory();
final LumberjackEvent event = factory.create(data, metadata, responder);
Assert.assertEquals(sender, event.getSender());
Assert.assertEquals(seqNumber, event.getSeqNumber());
Assert.assertEquals(fields, event.getFields());
Assert.assertEquals(data, event.getData());
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.DatatypeConverter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestLumberjackDecoder {
// Because no encoder for type 43 was coded, added Static hex
// representation of compressed data
//
private static final String singleFrameData = "3143000000aa785e4c8e4daac3300c8413c8cbfeddc017681da7b48540775df51245103936f54fb" +
"04c4a6e5f6917d03020e91bc93c9ba669597faccefa80ec0fed72440dd1174833e819370c798d98aa0e79a10ae44e36972f94198b26886bc" +
"0774422589024c865aaecff07f24c6e1b0c37fb6c2da18cdb4176834f72747c4152e6aa46330db7e9725707567db0240c93aace93e212464" +
"95857f755e89e76e2d77e000000ffff010000ffff05b43bb8";
private static final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728031957a97f82" +
"232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f021f71461b26873e711bee9480f48b0af10fe28" +
"89113b8c9e28f4322b82395413a50cafd79957c253d0b992faf4129c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f468079" +
"6b421964fc9b032ac4dcb54d2575a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff" +
"010000ffff35e0eff0";
private static final String payload = "";
private LumberjackDecoder decoder;
@Before
public void setup() {
this.decoder = new LumberjackDecoder(StandardCharsets.UTF_8);
}
@Test
public void testDecodeSingleFrame() {
final byte[] input = DatatypeConverter.parseHexBinary(singleFrameData);
List<LumberjackFrame> frames = null;
LumberjackFrame frame = null;
for (byte b : input) {
if (decoder.process(b)) {
frames = decoder.getFrames();
break;
}
}
frame = frames.get(frames.size() - 1);
Assert.assertNotNull(frame);
Assert.assertEquals(0x31, frame.getVersion());
Assert.assertEquals(0x44, frame.getFrameType());
Assert.assertEquals(1, frame.getSeqNumber());
// Look for a predefined number of bytes for matching of the inner payload
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15));
}
@Test
public void testDecodeMultipleFrame() {
final byte[] input = DatatypeConverter.parseHexBinary(multiFrameData);
List<LumberjackFrame> frames = null;
LumberjackFrame frame = null;
for (byte b : input) {
if (decoder.process(b)) {
frames = decoder.getFrames();
break;
}
}
frame = frames.get(1);
Assert.assertNotNull(frame);
Assert.assertEquals(0x31, frame.getVersion());
Assert.assertEquals(0x44, frame.getFrameType());
// Load the second frame therefore seqNumber = 2
Assert.assertEquals(2, frame.getSeqNumber());
// Look for a predefined number of bytes for matching of the inner payload
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15));
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import java.nio.ByteBuffer;
import javax.xml.bind.DatatypeConverter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestLumberjackEncoder {
private LumberjackEncoder encoder;
@Before
public void setup() {
this.encoder = new LumberjackEncoder();
}
@Test
public void testEncode() {
LumberjackFrame frame = new LumberjackFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x41)
.payload(ByteBuffer.allocate(8).putLong(123).array())
.build();
byte[] encoded = encoder.encode(frame);
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("3141000000000000007B"), encoded);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.frame;
import org.junit.Test;
public class TestLumberjackFrame {
@Test(expected = LumberjackFrameException.class)
public void testInvalidVersion() {
new LumberjackFrame.Builder().seqNumber(1234).dataSize(3).build();
}
@Test(expected = LumberjackFrameException.class)
public void testInvalidFrameType() {
new LumberjackFrame.Builder().frameType((byte) 0x70).dataSize(5).build();
}
@Test(expected = LumberjackFrameException.class)
public void testBlankFrameType() {
new LumberjackFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build();
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestLumberjackFrameHandler {
private Charset charset;
private EventFactory<LumberjackEvent> eventFactory;
private BlockingQueue<LumberjackEvent> events;
private SelectionKey key;
private AsyncChannelDispatcher dispatcher;
private LumberjackEncoder encoder;
private ComponentLog logger;
private LumberjackFrameHandler<LumberjackEvent> frameHandler;
@Before
public void setup() {
this.charset = StandardCharsets.UTF_8;
this.eventFactory = new LumberjackEventFactory();
this.events = new LinkedBlockingQueue<>();
this.key = Mockito.mock(SelectionKey.class);
this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
this.logger = Mockito.mock(ComponentLog.class);
this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
}
@Test
public void testWindow() throws IOException, InterruptedException {
final LumberjackFrame openFrame = new LumberjackFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x57)
.seqNumber(-1)
.payload(Integer.toString(1).getBytes())
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(openFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
}
@Test
public void testData() throws IOException, InterruptedException {
final byte payload[] = new byte[]{
0x00, 0x00, 0x00, 0x02, // Number of pairs
0x00, 0x00, 0x00, 0x04, // Length of first pair key ('line')
0x6C, 0x69, 0x6E, 0x65, // 'line'
0x00, 0x00, 0x00, 0x0C, // Lenght of 'test-content'
0x74, 0x65, 0x73, 0x74, //
0x2d, 0x63, 0x6f, 0x6e, // 'test-content'
0x74, 0x65, 0x6e, 0x74, //
0x00, 0x00, 0x00, 0x05, // Length of 2nd pair key (field)
0x66, 0x69, 0x65, 0x6c, //
0x64, // 'field'
0x00, 0x00, 0x00, 0x05, // Length of 'value'
0x76, 0x61, 0x6c, 0x75, // 'value'
0x65
};
final LumberjackFrame dataFrame = new LumberjackFrame.Builder()
.version((byte) 0x31)
.frameType((byte) 0x44)
.seqNumber(1)
// Payload eq { enil: hello }
.payload(payload)
.build();
final String sender = "sender1";
final CapturingChannelResponder responder = new CapturingChannelResponder();
// call the handler and verify respond() was called once with once response
frameHandler.handle(dataFrame, responder, sender);
// No response expected
Assert.assertEquals(0, responder.responded);
// But events should contain one event
Assert.assertEquals(1, events.size());
final LumberjackEvent event = events.poll();
Assert.assertEquals("{\"field\":\"value\"}", new String((event.getFields())));
Assert.assertEquals("test-content", new String(event.getData(), charset));
}
private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
int responded;
List<ChannelResponse> responses = new ArrayList<>();
@Override
public SocketChannel getChannel() {
return Mockito.mock(SocketChannel.class);
}
@Override
public List<ChannelResponse> getResponses() {
return responses;
}
@Override
public void addResponse(ChannelResponse response) {
responses.add(response);
}
@Override
public void respond() throws IOException {
responded++;
}
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestLumberjackSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private BlockingQueue<ByteBuffer> byteBuffers;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
private SSLContext sslContext;
private Charset charset;
private ChannelDispatcher dispatcher;
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
maxConnections, sslContext, charset);
}
@Test
public void testBasicHandling() throws IOException, InterruptedException {
final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" +
"031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" +
"1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" +
"29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" +
"a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" +
"0ffff35e0eff0";
final List<String> messages = new ArrayList<>();
messages.add(multiFrameData);
run(messages);
// Check for the 4 frames (from the hex string above) are back...
Assert.assertEquals(4, events.size());
boolean found1 = false;
boolean found2 = false;
boolean found3 = false;
boolean found4 = false;
TestEvent event;
while((event = events.poll()) != null) {
Map<String,String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY));
final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY);
if (seqNum.equals("1")) {
found1 = true;
} else if (seqNum.equals("2")) {
found2 = true;
} else if (seqNum.equals("3")) {
found3 = true;
} else if (seqNum.equals("4")) {
found4 = true;
}
}
Assert.assertTrue(found1);
Assert.assertTrue(found2);
Assert.assertTrue(found3);
Assert.assertTrue(found4);
}
protected void run(List<String> messages) throws IOException, InterruptedException {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);
dispatcherThread.start();
// create a client connection to the port the dispatcher is listening on
final int realPort = dispatcher.getPort();
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", realPort));
Thread.sleep(100);
// send the provided messages
for (int i=0; i < messages.size(); i++) {
buffer.clear();
buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(1);
}
}
// wait up to 10 seconds to verify the responses
long timeout = 10000;
long startTime = System.currentTimeMillis();
while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
Thread.sleep(100);
}
// should have gotten an event for each message sent
Assert.assertEquals(4, events.size());
} finally {
// stop the dispatcher thread and ensure we shut down handler threads
dispatcher.close();
}
}
// Test event to produce from the data
private static class TestEvent implements Event<SocketChannel> {
private byte[] data;
private Map<String, String> metadata;
public TestEvent(byte[] data, Map<String, String> metadata) {
this.data = data;
this.metadata = metadata;
}
@Override
public String getSender() {
return metadata.get(EventFactory.SENDER_KEY);
}
@Override
public byte[] getData() {
return data;
}
@Override
public ChannelResponder<SocketChannel> getResponder() {
return null;
}
}
// Factory to create test events and send responses for testing
private static class TestEventHolderFactory implements EventFactory<TestEvent> {
@Override
public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
return new TestEvent(data, metadata);
}
}
}

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lumberjack-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-lumberjack-processors</module>
<module>nifi-lumberjack-nar</module>
</modules>
</project>

View File

@ -57,6 +57,7 @@
<module>nifi-amqp-bundle</module>
<module>nifi-splunk-bundle</module>
<module>nifi-jms-bundle</module>
<module>nifi-lumberjack-bundle</module>
<module>nifi-cassandra-bundle</module>
<module>nifi-spring-bundle</module>
<module>nifi-hive-bundle</module>

12
pom.xml
View File

@ -1106,9 +1106,15 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lumberjack-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>