diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 38668e518c..76d5cbfcc3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -18,6 +18,26 @@ package org.apache.nifi.processors.standard; import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -55,27 +75,6 @@ import org.apache.nifi.processors.standard.syslog.SyslogEvent; import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.apache.nifi.ssl.SSLContextService; -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @@ -410,11 +409,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } final String sender = rawSyslogEvent.getSender(); - FlowFile flowFile = flowFilePerSender.get(sender); - if (flowFile == null) { - flowFile = session.create(); - flowFilePerSender.put(sender, flowFile); - } + FlowFile flowFile = flowFilePerSender.computeIfAbsent(sender, k -> session.create()); if (shouldParse) { boolean valid = true; @@ -428,7 +423,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { // If the event is invalid, route it to 'invalid' and then stop. // We create a separate FlowFile for this case instead of using 'flowFile', // because the 'flowFile' object may already have data written to it. - if (!valid || !event.isValid()) { + if (!valid || event == null || !event.isValid()) { FlowFile invalidFlowFile = session.create(); invalidFlowFile = session.putAllAttributes(invalidFlowFile, defaultAttributes); if (sender != null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java index 90fa816ea0..ae08b22d6f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java @@ -26,7 +26,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -86,6 +85,8 @@ public class ParseSyslog extends AbstractProcessor { .description("Any FlowFile that is successfully parsed as a Syslog message will be to this Relationship.") .build(); + private SyslogParser parser; + @Override protected List getSupportedPropertyDescriptors() { @@ -110,7 +111,12 @@ public class ParseSyslog extends AbstractProcessor { } final String charsetName = context.getProperty(CHARSET).getValue(); - final SyslogParser parser = new SyslogParser(Charset.forName(charsetName)); + + // If the parser already exists and uses the same charset, it does not need to be re-initialized + if (parser == null || !parser.getCharsetName().equals(charsetName)) { + parser = new SyslogParser(Charset.forName(charsetName)); + } + final byte[] buffer = new byte[(int) flowFile.getSize()]; session.read(flowFile, new InputStreamCallback() { @Override @@ -128,7 +134,7 @@ public class ParseSyslog extends AbstractProcessor { return; } - if (!event.isValid()) { + if (event == null || !event.isValid()) { getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile}); session.transfer(flowFile, REL_FAILURE); return; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java index b5dbf22720..52caedba81 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard.syslog; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -71,6 +72,10 @@ public class SyslogParser { private Charset charset; + public SyslogParser() { + this(StandardCharsets.UTF_8); + } + public SyslogParser(final Charset charset) { this.charset = charset; } @@ -162,4 +167,7 @@ public class SyslogParser { return builder.build(); } + public String getCharsetName() { + return charset == null ? StandardCharsets.UTF_8.name() : charset.name(); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy new file mode 100644 index 0000000000..1c6b4f8e86 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSessionFactory +import org.apache.nifi.processors.standard.syslog.SyslogParser +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.util.encoders.Hex +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +@RunWith(JUnit4.class) +class ListenSyslogGroovyTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(ListenSyslogGroovyTest.class) + + static final String ZERO_LENGTH_MESSAGE = " \n" + + @BeforeClass + static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() throws Exception { + } + + @After + void tearDown() throws Exception { + } + + @Test + void testShouldHandleZeroLengthUDP() throws Exception { + // Arrange + final ListenSyslog proc = new ListenSyslog() + final TestRunner runner = TestRunners.newTestRunner(proc) + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()) + runner.setProperty(ListenSyslog.PORT, "0") + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory() + final ProcessContext context = runner.getProcessContext() + proc.onScheduled(context) + + // Inject a SyslogParser which will always return null + def nullEventParser = [parseEvent: { byte[] bytes, String sender -> + logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null") + return null + }] as SyslogParser + proc.parser = nullEventParser + + final int numMessages = 10 + final int port = proc.getPort() + Assert.assertTrue(port > 0) + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new TestListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE)) + sender.setDaemon(true) + sender.start() + + // Act + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int numFailed = 0 + long timeout = System.currentTimeMillis() + 30000 + + while (numFailed < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(50) + proc.onTrigger(context, processSessionFactory) + numFailed = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size() + } + + int numSuccess = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size() + logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to INVALID") + + // Assert + + // all messages should be transferred to invalid + Assert.assertEquals("Did not process all the messages", numMessages, numFailed) + + } finally { + // unschedule to close connections + proc.onUnscheduled() + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy new file mode 100644 index 0000000000..208eaeec5f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.processors.standard.syslog.SyslogParser +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.util.encoders.Hex +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +@RunWith(JUnit4.class) +class ParseSyslogGroovyTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(ParseSyslogGroovyTest.class) + + @BeforeClass + static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() throws Exception { + } + + @After + void tearDown() throws Exception { + } + + @Test + void testShouldHandleZeroLengthUDP() throws Exception { + // Arrange + final ParseSyslog proc = new ParseSyslog() + final TestRunner runner = TestRunners.newTestRunner(proc) + runner.setProperty(ParseSyslog.CHARSET, ParseSyslog.CHARSET.defaultValue) + + // Inject a SyslogParser which will always return null + def nullEventParser = [parseEvent: { byte[] bytes, String sender -> + logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null") + return null + }] as SyslogParser + proc.parser = nullEventParser + + final int numMessages = 10 + + // Act + numMessages.times { + runner.enqueue("Doesn't matter what is enqueued here") + } + runner.run(numMessages) + + int numFailed = runner.getFlowFilesForRelationship(ParseSyslog.REL_FAILURE).size() + int numSuccess = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).size() + logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to FAILURE") + + // Assert + + // all messages should be transferred to invalid + Assert.assertEquals("Did not process all the messages", numMessages, numFailed) + } +}