diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index 43d01b8895..8333ae2a65 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -261,17 +261,19 @@ public abstract class AbstractListenEventProcessor extends Abst event = errorEvents.poll(); } - if (event == null) { - try { - if (longPoll) { - event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } else { - event = events.poll(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; + if (event != null) { + return event; + } + + try { + if (longPoll) { + event = events.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS); + } else { + event = events.poll(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; } if (event != null) { @@ -281,4 +283,7 @@ public abstract class AbstractListenEventProcessor extends Abst return event; } + protected long getLongPollTimeout() { + return POLL_TIMEOUT_MS; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java new file mode 100644 index 0000000000..3c8e71b4bd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.processor.util.listen.event.StandardEventFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +@SupportsBatching +@Tags({"ingest", "udp", "listen", "source", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Listens for Datagram Packets on a given port and reads the content of each datagram using the " + + "configured Record Reader. Each record will then be written to a flow file using the configured Record Writer. This processor " + + "can be restricted to listening for datagrams from a specific remote host and port by specifying the Sending Host and " + + "Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.") +@WritesAttributes({ + @WritesAttribute(attribute="udp.sender", description="The sending host of the messages."), + @WritesAttribute(attribute="udp.port", description="The sending port the messages were received."), + @WritesAttribute(attribute="record.count", description="The number of records written to the flow file."), + @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.") +}) +public class ListenUDPRecord extends AbstractListenEventProcessor { + + public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder() + .name("sending-host") + .displayName("Sending Host") + .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will " + + "be accepted. Improves Performance. May be a system property or an environment variable.") + .addValidator(new HostValidator()) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder() + .name("sending-host-port") + .displayName("Sending Host Port") + .description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and " + + "this port will be accepted. Improves Performance. May be a system property or an environment variable.") + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for reading the content of incoming datagrams.") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data before writing to a flow file.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + public static final PropertyDescriptor POLL_TIMEOUT = new PropertyDescriptor.Builder() + .name("poll-timeout") + .displayName("Poll Timeout") + .description("The amount of time to wait when polling the internal queue for more datagrams. If no datagrams are found after waiting " + + "for the configured timeout, then the processor will emit whatever records have been obtained up to that point.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("50 ms") + .required(true) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("batch-size") + .displayName("Batch Size") + .description("The maximum number of datagrams to write as records to a single FlowFile. The Batch Size will only be reached when " + + "data is coming in more frequently than the Poll Timeout.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1000") + .required(true) + .build(); + + public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder() + .name("parse.failure") + .description("If a datagram cannot be parsed using the configured Record Reader, the contents of the " + + "message will be routed to this Relationship as its own individual FlowFile.") + .build(); + + public static final String UDP_PORT_ATTR = "udp.port"; + public static final String UDP_SENDER_ATTR = "udp.sender"; + public static final String RECORD_COUNT_ATTR = "record.count"; + + private volatile long pollTimeout; + + @Override + protected List getAdditionalProperties() { + return Arrays.asList( + POLL_TIMEOUT, + BATCH_SIZE, + RECORD_READER, + RECORD_WRITER, + SENDING_HOST, + SENDING_HOST_PORT + ); + } + + @Override + protected List getAdditionalRelationships() { + return Arrays.asList(REL_PARSE_FAILURE); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + this.pollTimeout = context.getProperty(POLL_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + } + + @Override + protected long getLongPollTimeout() { + return pollTimeout; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final Collection result = new ArrayList<>(); + + final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue(); + final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue(); + + if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) { + result.add( + new ValidationResult.Builder() + .subject(SENDING_HOST.getName()) + .valid(false) + .explanation("Must specify Sending Host when specifying Sending Host Port") + .build()); + } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) { + result.add( + new ValidationResult.Builder() + .subject(SENDING_HOST_PORT.getName()) + .valid(false) + .explanation("Must specify Sending Host Port when specifying Sending Host") + .build()); + } + + return result; + } + + @Override + protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue events) + throws IOException { + final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue(); + final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger(); + final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final BlockingQueue bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize); + final EventFactory eventFactory = new StandardEventFactory(); + return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int maxBatchSize = context.getProperty(BATCH_SIZE).asInteger(); + final Map flowFileRecordWriters = new HashMap<>(); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + for (int i=0; i < maxBatchSize; i++) { + // this processor isn't leveraging the error queue so don't bother polling to avoid the overhead + // if the error handling is ever changed to use the error queue then this flag needs to be changed as well + final StandardEvent event = getMessage(true, false, session); + + // break out if we don't have any messages, don't yield since we already do a long poll inside getMessage + if (event == null) { + break; + } + + // attempt to read all of the records from the current datagram into a list in memory so that we can ensure the + // entire datagram can be read as records, and if not transfer the whole thing to parse.failure + final RecordReader reader; + final List records = new ArrayList<>(); + try (final InputStream in = new ByteArrayInputStream(event.getData())) { + reader = readerFactory.createRecordReader(Collections.emptyMap(), in, getLogger()); + + Record record; + while((record = reader.nextRecord()) != null) { + records.add(record); + } + } catch (final Exception e) { + handleParseFailure(event, session, e); + continue; + } + + if (records.size() == 0) { + handleParseFailure(event, session, null); + continue; + } + + // see if we already started a flow file and writer for the given sender + // if an exception happens creating the flow file or writer, put the event in the error queue to try it again later + FlowFileRecordWriter flowFileRecordWriter = flowFileRecordWriters.get(event.getSender()); + + if (flowFileRecordWriter == null) { + FlowFile flowFile = null; + OutputStream rawOut = null; + RecordSetWriter writer = null; + try { + flowFile = session.create(); + rawOut = session.write(flowFile); + + final Record firstRecord = records.get(0); + final RecordSchema recordSchema = firstRecord.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); + + writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut); + writer.beginRecordSet(); + + flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer); + flowFileRecordWriters.put(event.getSender(), flowFileRecordWriter); + } catch (final Exception ex) { + getLogger().error("Failed to properly initialize record writer. Datagram will be queued for re-processing.", ex); + try { + if (writer != null) { + writer.close(); + } + } catch (final Exception e) { + getLogger().warn("Failed to close Record Writer", e); + } + + if (rawOut != null) { + IOUtils.closeQuietly(rawOut); + } + + if (flowFile != null) { + session.remove(flowFile); + } + + context.yield(); + break; + } + } + + // attempt to write each record, if any record fails then remove the flow file and break out of the loop + final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter(); + try { + for (final Record record : records) { + writer.write(record); + } + } catch (Exception e) { + getLogger().error("Failed to write records due to: " + e.getMessage(), e); + IOUtils.closeQuietly(writer); + session.remove(flowFileRecordWriter.getFlowFile()); + flowFileRecordWriters.remove(event.getSender()); + break; + } + } + + // attempt to finish each record set and transfer the flow file, if an error is encountered calling + // finishRecordSet or closing the writer then remove the flow file + + for (final Map.Entry entry : flowFileRecordWriters.entrySet()) { + final String sender = entry.getKey(); + final FlowFileRecordWriter flowFileRecordWriter = entry.getValue(); + final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter(); + + FlowFile flowFile = flowFileRecordWriter.getFlowFile(); + try { + final WriteResult writeResult; + try { + writeResult = writer.finishRecordSet(); + } finally { + writer.close(); + } + + if (writeResult.getRecordCount() == 0) { + session.remove(flowFile); + continue; + } + + final Map attributes = new HashMap<>(); + attributes.putAll(getAttributes(sender)); + attributes.putAll(writeResult.getAttributes()); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.getRecordCount())); + + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + + final String transitUri = getTransitUri(sender); + session.getProvenanceReporter().receive(flowFile, transitUri); + + } catch (final Exception e) { + getLogger().error("Unable to properly complete record set due to: " + e.getMessage(), e); + session.remove(flowFile); + } + } + } + + private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause) { + handleParseFailure(event, session, cause, "Failed to parse datagram using the configured Record Reader. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship"); + } + + private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause, final String message) { + // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship + final Map attributes = getAttributes(event.getSender()); + + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(event.getData())); + failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); + + final String transitUri = getTransitUri(event.getSender()); + session.getProvenanceReporter().receive(failureFlowFile, transitUri); + + session.transfer(failureFlowFile, REL_PARSE_FAILURE); + + if (cause == null) { + getLogger().error(message); + } else { + getLogger().error(message, cause); + } + + session.adjustCounter("Parse Failures", 1, false); + } + + private Map getAttributes(final String sender) { + final Map attributes = new HashMap<>(3); + attributes.put(UDP_SENDER_ATTR, sender); + attributes.put(UDP_PORT_ATTR, String.valueOf(port)); + return attributes; + } + + private String getTransitUri(final String sender) { + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":") + .append(port).toString(); + return transitUri; + } + + /** + * Holder class to pass around a flow file and the writer that is writing records to it. + */ + private static class FlowFileRecordWriter { + + private final FlowFile flowFile; + + private final RecordSetWriter recordWriter; + + public FlowFileRecordWriter(final FlowFile flowFile, final RecordSetWriter recordWriter) { + this.flowFile = flowFile; + this.recordWriter = recordWriter; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public RecordSetWriter getRecordWriter() { + return recordWriter; + } + } + + private static class HostValidator implements Validator { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + try { + InetAddress.getByName(input); + return new ValidationResult.Builder().subject(subject).valid(true).input(input).build(); + } catch (final UnknownHostException e) { + return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build(); + } + } + } + +} + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index f3e72e014b..c95f964f25 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -57,6 +57,7 @@ org.apache.nifi.processors.standard.ListenSyslog org.apache.nifi.processors.standard.ListenTCP org.apache.nifi.processors.standard.ListenTCPRecord org.apache.nifi.processors.standard.ListenUDP +org.apache.nifi.processors.standard.ListenUDPRecord org.apache.nifi.processors.standard.ListSFTP org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.LogMessage diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java new file mode 100644 index 0000000000..459df9e2b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class TestListenUDPRecord { + + static final String SCHEMA_TEXT = "{\n" + + " \"name\": \"syslogRecord\",\n" + + " \"namespace\": \"nifi\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " { \"name\": \"timestamp\", \"type\": \"string\" },\n" + + " { \"name\": \"logsource\", \"type\": \"string\" },\n" + + " { \"name\": \"message\", \"type\": \"string\" }\n" + + " ]\n" + + "}"; + + static final String DATAGRAM_1 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"} ]"; + static final String DATAGRAM_2 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"} ]"; + static final String DATAGRAM_3 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"} ]"; + + static final String MULTI_DATAGRAM_1 = "[" + + "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," + + "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," + + "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" + + "]"; + + static final String MULTI_DATAGRAM_2 = "[" + + "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 4\"}," + + "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 5\"}," + + "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 6\"}" + + "]"; + + private TestableListenUDPRecord proc; + private TestRunner runner; + private MockRecordWriter mockRecordWriter; + + @Before + public void setup() throws InitializationException { + proc = new TestableListenUDPRecord(); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenUDP.PORT, "1"); + + final String readerId = "record-reader"; + final RecordReaderFactory readerFactory = new JsonTreeReader(); + runner.addControllerService(readerId, readerFactory); + runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); + runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT); + runner.enableControllerService(readerFactory); + + final String writerId = "record-writer"; + mockRecordWriter = new MockRecordWriter("timestamp, logsource, message"); + runner.addControllerService(writerId, mockRecordWriter); + runner.enableControllerService(mockRecordWriter); + + runner.setProperty(ListenUDPRecord.RECORD_READER, readerId); + runner.setProperty(ListenUDPRecord.RECORD_WRITER, writerId); + } + + @Test + public void testSuccessWithBatchSizeGreaterThanAvailableRecords() { + final String sender = "foo"; + + final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event1); + + final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event2); + + final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event3); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "3"); + } + + @Test + public void testSuccessWithBatchLessThanAvailableRecords() { + final String sender = "foo"; + + final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event1); + + final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event2); + + final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event3); + + runner.setProperty(ListenUDPRecord.BATCH_SIZE, "1"); + + // batch 1 + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1"); + + // batch 2 + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1); + + flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1"); + + // batch 3 + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1); + + flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1"); + + // no more left + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0); + } + + @Test + public void testMultipleRecordsPerDatagram() { + final String sender = "foo"; + + final StandardEvent event1 = new StandardEvent(sender, MULTI_DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event1); + + final StandardEvent event2 = new StandardEvent(sender, MULTI_DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "6"); + } + + @Test + public void testParseFailure() { + final String sender = "foo"; + + final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event1); + + final StandardEvent event2 = new StandardEvent(sender, "WILL NOT PARSE".getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event2); + + runner.run(); + runner.assertTransferCount(ListenUDPRecord.REL_SUCCESS, 1); + runner.assertTransferCount(ListenUDPRecord.REL_PARSE_FAILURE, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_PARSE_FAILURE).get(0); + flowFile.assertContentEquals("WILL NOT PARSE"); + } + + @Test + public void testWriterFailure() throws InitializationException { + // re-create the writer to set fail-after 2 attempts + final String writerId = "record-writer"; + mockRecordWriter = new MockRecordWriter("timestamp, logsource, message", false, 2); + runner.addControllerService(writerId, mockRecordWriter); + runner.enableControllerService(mockRecordWriter); + runner.setProperty(ListenUDPRecord.RECORD_WRITER, writerId); + + final String sender = "foo"; + + final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event1); + + final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event2); + + final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null); + proc.addEvent(event3); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0); + runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_PARSE_FAILURE, 0); + } + + private static class TestableListenUDPRecord extends ListenUDPRecord { + + private volatile BlockingQueue testEvents = new LinkedBlockingQueue<>(); + private volatile BlockingQueue testErrorEvents = new LinkedBlockingQueue<>(); + + @Override + protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue events) throws IOException { + return Mockito.mock(ChannelDispatcher.class); + } + + public void addEvent(final StandardEvent event) { + this.testEvents.add(event); + } + + public void addErrorEvent(final StandardEvent event) { + this.testErrorEvents.add(event); + } + + @Override + protected StandardEvent getMessage(boolean longPoll, boolean pollErrorQueue, ProcessSession session) { + StandardEvent event = null; + if (pollErrorQueue) { + event = testErrorEvents.poll(); + } + + if (event == null) { + try { + if (longPoll) { + event = testEvents.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS); + } else { + event = testEvents.poll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + return event; + } + } + +}