NIFI-11889 Added Record-oriented Transmission to PutTCP

This closes #7554
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2023-07-31 15:19:55 -05:00 committed by Paul Grey
parent 63c72bd7e2
commit 5c577be946
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
4 changed files with 359 additions and 48 deletions

View File

@ -54,19 +54,21 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the destination.")
.description("Destination hostname or IP address")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port on the destination.")
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("Destination port number")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
@ -76,8 +78,9 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor.Builder()
.name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
.required(true)
.defaultValue("15 seconds")
@ -89,13 +92,14 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
// not added to the properties by default since not all processors may need them
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
.Builder().name("Protocol")
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor.Builder()
.name("Protocol")
.description("The protocol for communication.")
.required(true)
.allowableValues(TCP_VALUE, UDP_VALUE)
.defaultValue(TCP_VALUE.getValue())
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
@ -109,6 +113,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the data being sent.")
@ -117,6 +122,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
@ -125,6 +131,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Outgoing Message Delimiter")
.description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message "
@ -135,6 +142,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("Connection Per FlowFile")
.description("Specifies whether to send each FlowFile's content on an individual connection.")
@ -142,10 +150,10 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be sent over a secure connection.")
.description("Specifies the SSL Context Service to enable TLS socket communication")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@ -154,6 +162,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.name("success")
.description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are sent out this relationship.")
@ -414,14 +423,14 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
}
if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile});
getLogger().info("Completed processing {} but sent 0 FlowFiles", flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
return;
}
if (successfulRanges.isEmpty()) {
getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", flowFile, lastFailureReason);
final FlowFile penalizedFlowFile = session.penalize(flowFile);
session.transfer(penalizedFlowFile, REL_FAILURE);
session.commitAsync();
@ -432,7 +441,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
getLogger().info("Successfully sent {} messages for {} in {} millis", successfulRanges.size(), flowFile, transferMillis);
session.commitAsync();
return;
}
@ -444,7 +453,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}",
new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
successfulRanges.size(), failedRanges.size(), lastFailureReason);
session.commitAsync();
}
}

View File

@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -33,33 +35,89 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processors.standard.property.TransmissionStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@CapabilityDescription("The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+ "By default, the FlowFiles are transmitted over the same TCP connection (or pool of TCP connections if multiple input threads are configured). "
+ "To assist the TCP server with determining message boundaries, an optional \"Outgoing Message Delimiter\" string can be configured which is appended "
+ "to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional \"Connection Per FlowFile\" parameter can be "
+ "specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile "
+ "is received and closed after the FlowFile has been sent. This option should only be used for low message volume scenarios, otherwise the platform " + "may run out of TCP sockets.")
@CapabilityDescription("Sends serialized FlowFiles or Records over TCP to a configurable destination with optional support for TLS")
@InputRequirement(Requirement.INPUT_REQUIRED)
@SeeAlso({ListenTCP.class, PutUDP.class})
@Tags({ "remote", "egress", "put", "tcp" })
@SupportsBatching
@WritesAttributes({
@WritesAttribute(attribute = PutTCP.RECORD_COUNT_TRANSMITTED, description = "Count of records transmitted to configured destination address")
})
public class PutTCP extends AbstractPutEventProcessor<InputStream> {
public static final String RECORD_COUNT_TRANSMITTED = "record.count.transmitted";
static final PropertyDescriptor TRANSMISSION_STRATEGY = new PropertyDescriptor.Builder()
.name("Transmission Strategy")
.displayName("Transmission Strategy")
.description("Specifies the strategy used for reading input FlowFiles and transmitting messages to the destination socket address")
.required(true)
.allowableValues(TransmissionStrategy.class)
.defaultValue(TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
.build();
static final PropertyDescriptor DEPENDENT_CHARSET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CHARSET)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
.build();
static final PropertyDescriptor DEPENDENT_OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(OUTGOING_MESSAGE_DELIMITER)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading Records from input FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue())
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing Records to the configured socket address")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue())
.build();
private static final List<PropertyDescriptor> ADDITIONAL_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
CONNECTION_PER_FLOWFILE,
SSL_CONTEXT_SERVICE,
TRANSMISSION_STRATEGY,
DEPENDENT_OUTGOING_MESSAGE_DELIMITER,
DEPENDENT_CHARSET,
RECORD_READER,
RECORD_WRITER
));
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(CONNECTION_PER_FLOWFILE,
OUTGOING_MESSAGE_DELIMITER,
TIMEOUT,
SSL_CONTEXT_SERVICE,
CHARSET);
return ADDITIONAL_PROPERTIES;
}
@Override
@ -70,22 +128,21 @@ public class PutTCP extends AbstractPutEventProcessor<InputStream> {
return;
}
final TransmissionStrategy transmissionStrategy = TransmissionStrategy.valueOf(context.getProperty(TRANSMISSION_STRATEGY).getValue());
final StopWatch stopWatch = new StopWatch(true);
try {
session.read(flowFile, inputStream -> {
InputStream inputStreamEvent = inputStream;
final int recordCount;
if (TransmissionStrategy.RECORD_ORIENTED == transmissionStrategy) {
recordCount = sendRecords(context, session, flowFile);
final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
if (delimiter != null) {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
}
} else {
sendFlowFile(context, session, flowFile);
recordCount = 0;
}
eventSender.sendEvent(inputStreamEvent);
});
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
final FlowFile processedFlowFile = session.putAttribute(flowFile, RECORD_COUNT_TRANSMITTED, Integer.toString(recordCount));
session.getProvenanceReporter().send(processedFlowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(processedFlowFile, REL_SUCCESS);
session.commitAsync();
} catch (final Exception e) {
getLogger().error("Send Failed {}", flowFile, e);
@ -104,4 +161,64 @@ public class PutTCP extends AbstractPutEventProcessor<InputStream> {
protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.TCP);
}
private void sendFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
session.read(flowFile, inputStream -> {
InputStream inputStreamEvent = inputStream;
final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
if (delimiter != null) {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
}
eventSender.sendEvent(inputStreamEvent);
});
}
private int sendRecords(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
final AtomicInteger recordCount = new AtomicInteger();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
session.read(flowFile, inputStream -> {
try (
RecordReader recordReader = readerFactory.createRecordReader(flowFile, inputStream, getLogger());
ReusableByteArrayInputStream eventInputStream = new ReusableByteArrayInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), recordReader.getSchema(), outputStream, flowFile)
) {
Record record;
while ((record = recordReader.nextRecord()) != null) {
recordSetWriter.write(record);
recordSetWriter.flush();
final byte[] buffer = outputStream.toByteArray();
eventInputStream.setBuffer(buffer);
eventSender.sendEvent(eventInputStream);
outputStream.reset();
recordCount.getAndIncrement();
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new IOException("Record reading failed", e);
}
});
return recordCount.get();
}
private static class ReusableByteArrayInputStream extends ByteArrayInputStream {
private ReusableByteArrayInputStream() {
super(new byte[0]);
}
private void setBuffer(final byte[] buffer) {
this.buf = buffer;
this.pos = 0;
this.count = buffer.length;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.property;
import org.apache.nifi.components.DescribedValue;
/**
* Transmission Strategy enumeration of allowable values for component Property Descriptors
*/
public enum TransmissionStrategy implements DescribedValue {
FLOWFILE_ORIENTED("FlowFile-oriented", "Send FlowFile content as a single stream"),
RECORD_ORIENTED("Record-oriented", "Read Records from input FlowFiles and send serialized Records as individual messages");
private final String displayName;
private final String description;
TransmissionStrategy(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -18,15 +18,26 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processors.standard.property.TransmissionStrategy;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
@ -34,28 +45,41 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@Timeout(30)
@ExtendWith(MockitoExtension.class)
public class TestPutTCP {
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
private final static String SERVER_VARIABLE = "server.address";
private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
private final static int MIN_INVALID_PORT = 0;
private final static int MIN_VALID_PORT = 1;
private final static int MAX_VALID_PORT = 65535;
private final static int MAX_INVALID_PORT = 65536;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int LOAD_TEST_ITERATIONS = 500;
@ -68,6 +92,26 @@ public class TestPutTCP {
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
private static final String WRITER_SERVICE_ID = RecordSetWriterFactory.class.getSimpleName();
private static final String READER_SERVICE_ID = RecordReaderFactory.class.getSimpleName();
private static final String RECORD = String.class.getSimpleName();
private static final Record NULL_RECORD = null;
@Mock
private RecordSetWriterFactory writerFactory;
@Mock
private RecordReaderFactory readerFactory;
@Mock
private RecordReader recordReader;
@Mock
private Record record;
private EventServer eventServer;
private TestRunner runner;
private BlockingQueue<ByteArrayMessage> messages;
@ -194,6 +238,55 @@ public class TestPutTCP {
assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
}
@Test
void testRunSuccessRecordOriented() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
runner.setProperty(PutTCP.HOSTNAME, TCP_SERVER_ADDRESS);
runner.setProperty(PutTCP.PORT, String.valueOf(eventServer.getListeningPort()));
runner.setProperty(PutTCP.TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue());
when(writerFactory.getIdentifier()).thenReturn(WRITER_SERVICE_ID);
runner.addControllerService(WRITER_SERVICE_ID, writerFactory);
runner.enableControllerService(writerFactory);
runner.setProperty(PutTCP.RECORD_WRITER, WRITER_SERVICE_ID);
when(readerFactory.getIdentifier()).thenReturn(READER_SERVICE_ID);
runner.addControllerService(READER_SERVICE_ID, readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(PutTCP.RECORD_READER, READER_SERVICE_ID);
when(readerFactory.createRecordReader(any(), any(), any())).thenReturn(recordReader);
when(recordReader.nextRecord()).thenReturn(record, NULL_RECORD);
when(writerFactory.createWriter(any(), any(), any(OutputStream.class), any(FlowFile.class))).thenAnswer((Answer<RecordSetWriter>) invocationOnMock -> {
final OutputStream outputStream = invocationOnMock.getArgument(2, OutputStream.class);
return new TestPutTCP.MockRecordSetWriter(outputStream);
});
runner.enqueue(RECORD);
runner.run();
runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
final Iterator<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(PutTCP.REL_SUCCESS).iterator();
assertTrue(successFlowFiles.hasNext(), "Success FlowFiles not found");
final MockFlowFile successFlowFile = successFlowFiles.next();
successFlowFile.assertAttributeEquals(PutTCP.RECORD_COUNT_TRANSMITTED, Integer.toString(1));
final List<ProvenanceEventRecord> provenanceEventRecords = runner.getProvenanceEvents();
final Optional<ProvenanceEventRecord> sendEventFound = provenanceEventRecords.stream()
.filter(eventRecord -> ProvenanceEventType.SEND == eventRecord.getEventType())
.findFirst();
assertTrue(sendEventFound.isPresent(), "Provenance Send Event not found");
final ProvenanceEventRecord sendEventRecord = sendEventFound.get();
assertTrue(sendEventRecord.getTransitUri().contains(TCP_SERVER_ADDRESS), "Transit URI not matched");
final ByteArrayMessage message = messages.take();
assertNotNull(message);
assertArrayEquals(RECORD.getBytes(StandardCharsets.UTF_8), message.getMessage());
assertEquals(TCP_SERVER_ADDRESS, message.getSender());
}
private void createTestServer(final String delimiter) throws UnknownHostException {
createTestServer(null, delimiter);
}
@ -206,7 +299,7 @@ public class TestPutTCP {
if (sslContext != null) {
serverFactory.setSslContext(sslContext);
}
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
serverFactory.setShutdownQuietPeriod(Duration.ZERO);
serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
eventServer = serverFactory.getEventServer();
}
@ -270,12 +363,52 @@ public class TestPutTCP {
private String[] createContent(final int size) {
final char[] content = new char[size];
for (int i = 0; i < size; i++) {
content[i] = CONTENT_CHAR;
}
Arrays.fill(content, CONTENT_CHAR);
return new String[] { new String(content) };
}
private static class MockRecordSetWriter implements RecordSetWriter {
private final OutputStream outputStream;
private MockRecordSetWriter(final OutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public WriteResult write(final RecordSet recordSet) {
return WriteResult.EMPTY;
}
@Override
public void beginRecordSet() {
}
@Override
public WriteResult finishRecordSet() {
return WriteResult.EMPTY;
}
@Override
public WriteResult write(Record record) throws IOException {
outputStream.write(RECORD.getBytes(StandardCharsets.UTF_8));
outputStream.write(OUTGOING_MESSAGE_DELIMITER.getBytes(StandardCharsets.UTF_8));
return WriteResult.of(1, Collections.emptyMap());
}
@Override
public String getMimeType() {
return null;
}
@Override
public void flush() {
}
@Override
public void close() {
}
}
}