NIFI-10087 Implemented UDPEventRecordSink

This closes #6099
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2022-06-03 20:37:50 -05:00 committed by Paul Grey
parent 1f2820a39a
commit 07bbcb771e
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
4 changed files with 367 additions and 0 deletions

View File

@ -54,6 +54,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>1.17.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.sink.event;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Record Sink Service implementation writes Records and sends a serialized Record to a UDP destination
*/
@Tags({"UDP", "event", "record", "sink"})
@CapabilityDescription("Format and send Records as UDP Datagram Packets to a configurable destination")
public class UDPEventRecordSink extends AbstractControllerService implements RecordSinkService {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("hostname")
.displayName("Hostname")
.description("Destination hostname or IP address")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("port")
.displayName("Port")
.description("Destination port number")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SENDER_THREADS = new PropertyDescriptor.Builder()
.name("sender-threads")
.displayName("Sender Threads")
.description("Number of worker threads allocated for handling socket communication")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("2")
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
Arrays.asList(
HOSTNAME,
PORT,
RECORD_WRITER_FACTORY,
SENDER_THREADS
)
);
private static final String TRANSIT_URI_ATTRIBUTE_KEY = "record.sink.url";
private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
private RecordSetWriterFactory writerFactory;
private EventSender<byte[]> eventSender;
private String transitUri;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
eventSender = getEventSender(context);
}
@OnDisabled
public void onDisabled() throws Exception {
if (eventSender == null) {
getLogger().debug("Event Sender not configured");
} else {
eventSender.close();
}
}
/**
* Send Records to Event Sender serializes each Record as a Record Set of one to a byte array for transmission
*
* @param recordSet Set of Records to be transmitted
* @param attributes FlowFile attributes
* @param sendZeroResults Whether to transmit empty record sets
* @return Write Result indicating records transmitted
* @throws IOException Thrown on transmission failures
*/
@Override
public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, boolean sendZeroResults) throws IOException {
final Map<String, String> writeAttributes = new LinkedHashMap<>(attributes);
writeAttributes.put(TRANSIT_URI_ATTRIBUTE_KEY, transitUri);
int recordCount = 0;
try (
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, writeAttributes)
) {
Record record;
while ((record = recordSet.next()) != null) {
final WriteResult writeResult = writer.write(record);
writer.flush();
sendRecord(outputStream);
recordCount += writeResult.getRecordCount();
}
} catch (final SchemaNotFoundException e) {
throw new IOException("Record Schema not found", e);
} catch (final IOException|RuntimeException e) {
throw new IOException(String.format("Record [%d] Destination [%s] Transmission failed", recordCount, transitUri), e);
}
return WriteResult.of(recordCount, writeAttributes);
}
/**
* Send record and reset stream for subsequent records
*
* @param outputStream Byte Array Output Stream containing serialized record
*/
private void sendRecord(final ByteArrayOutputStream outputStream) {
final byte[] bytes = outputStream.toByteArray();
eventSender.sendEvent(bytes);
outputStream.reset();
}
private EventSender<byte[]> getEventSender(final ConfigurationContext context) {
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
transitUri = String.format(TRANSIT_URI_FORMAT, hostname, port);
final ByteArrayNettyEventSenderFactory factory = new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.UDP);
factory.setShutdownQuietPeriod(Duration.ZERO);
factory.setShutdownTimeout(Duration.ZERO);
factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
final int senderThreads = context.getProperty(SENDER_THREADS).evaluateAttributeExpressions().asInteger();
factory.setWorkerThreads(senderThreads);
return factory.getEventSender();
}
}

View File

@ -15,3 +15,4 @@
org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup
org.apache.nifi.record.sink.LoggingRecordSink
org.apache.nifi.record.sink.EmailRecordSink
org.apache.nifi.record.sink.event.UDPEventRecordSink

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.sink.event;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class TestUDPEventRecordSink {
private static final String IDENTIFIER = UDPEventRecordSink.class.getSimpleName();
private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
private static final String TRANSIT_URI_KEY = "record.sink.url";
private static final String LOCALHOST = "127.0.0.1";
private static final String ID_FIELD = "id";
private static final String ID_FIELD_VALUE = TestUDPEventRecordSink.class.getSimpleName();
private static final boolean SEND_ZERO_RESULTS = true;
private static final byte[] DELIMITER = new byte[]{};
private static final int MAX_FRAME_SIZE = 1024;
private static final int MESSAGE_POLL_TIMEOUT = 5;
private static final String NULL_HEADER = null;
private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
private static final Record[] RECORDS = getRecords();
private EventServer eventServer;
private BlockingQueue<ByteArrayMessage> messages;
private String transitUri;
private UDPEventRecordSink sink;
@BeforeEach
void setRunner() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
runner.enableControllerService(recordWriter);
final int port = NetworkUtils.getAvailableUdpPort();
eventServer = createServer(runner, port);
sink = new UDPEventRecordSink();
runner.addControllerService(IDENTIFIER, sink);
runner.setProperty(sink, UDPEventRecordSink.HOSTNAME, LOCALHOST);
runner.setProperty(sink, UDPEventRecordSink.PORT, Integer.toString(port));
runner.setProperty(sink, UDPEventRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
runner.enableControllerService(sink);
transitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST, port);
}
@AfterEach
void shutdownServer() {
eventServer.shutdown();
}
@Test
void testSendData() throws IOException, InterruptedException {
final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, RECORDS);
final WriteResult writeResult = sink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
assertNotNull(writeResult);
final String resultTransitUri = writeResult.getAttributes().get(TRANSIT_URI_KEY);
assertEquals(transitUri, resultTransitUri);
assertEquals(RECORDS.length, writeResult.getRecordCount());
final String firstMessage = pollMessage();
assertEquals(ID_FIELD_VALUE, firstMessage);
final String secondMessage = pollMessage();
assertEquals(ID_FIELD_VALUE, secondMessage);
}
@Test
void testSendDataRecordSetEmpty() throws IOException {
final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
final WriteResult writeResult = sink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
assertNotNull(writeResult);
final String resultTransitUri = writeResult.getAttributes().get(TRANSIT_URI_KEY);
assertEquals(transitUri, resultTransitUri);
assertEquals(0, writeResult.getRecordCount());
}
private String pollMessage() throws InterruptedException {
final ByteArrayMessage record = messages.poll(MESSAGE_POLL_TIMEOUT, TimeUnit.SECONDS);
assertNotNull(record);
return new String(record.getMessage(), StandardCharsets.UTF_8).trim();
}
private EventServer createServer(final TestRunner runner, final int port) throws Exception {
messages = new LinkedBlockingQueue<>();
final InetAddress listenAddress = InetAddress.getByName(LOCALHOST);
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
runner.getLogger(),
listenAddress,
port,
TransportProtocol.UDP,
DELIMITER,
MAX_FRAME_SIZE,
messages
);
serverFactory.setShutdownQuietPeriod(Duration.ZERO);
serverFactory.setShutdownTimeout(Duration.ZERO);
return serverFactory.getEventServer();
}
private static RecordSchema getRecordSchema() {
final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
return new SimpleRecordSchema(Collections.singletonList(idField));
}
private static Record[] getRecords() {
final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
final Record record = new MapRecord(RECORD_SCHEMA, values);
return new Record[]{record, record};
}
}