NIFI-8326: Send records as individual messages in Kafka RecordSinks

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4901.
This commit is contained in:
Matthew Burgess 2021-03-16 10:25:44 -04:00 committed by Pierre Villard
parent 3833b51764
commit df04c60e01
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 100 additions and 33 deletions

View File

@ -64,6 +64,10 @@ public class ByteCountingOutputStream extends OutputStream {
out.close();
}
public void reset() {
bytesWritten = 0;
}
public OutputStream getWrappedStream() {
return out;
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
class ByteCountingOutputStreamTest {
@Test
void testReset() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(100);
ByteCountingOutputStream bcos = new ByteCountingOutputStream(baos);
bcos.write("Hello".getBytes(StandardCharsets.UTF_8));
assertEquals(5, bcos.getBytesWritten());
bcos.reset();
assertEquals(0, bcos.getBytesWritten());
}
}

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -59,8 +60,12 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -226,58 +231,43 @@ public class KafkaRecordSink_2_6 extends AbstractControllerService implements Ka
public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
try {
WriteResult writeResult;
final RecordSchema writeSchema = getWriterFactory().getSchema(null, recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteCountingOutputStream out = new ByteCountingOutputStream(baos);
final Queue<Future<RecordMetadata>> ackQ = new LinkedList<>();
int recordCount = 0;
try (final RecordSetWriter writer = getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
writer.beginRecordSet();
Record record;
while ((record = recordSet.next()) != null) {
baos.reset();
out.reset();
writer.write(record);
writer.flush();
recordCount++;
if (out.getBytesWritten() > maxMessageSize) {
throw new TokenTooLargeException("The query's result set size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
throw new TokenTooLargeException("A record's size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
}
sendMessage(topic, baos.toByteArray(), ackQ);
}
writeResult = writer.finishRecordSet();
if (out.getBytesWritten() > maxMessageSize) {
throw new TokenTooLargeException("The query's result set size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
throw new TokenTooLargeException("A record's size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
}
recordCount = writeResult.getRecordCount();
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put("record.count", Integer.toString(recordCount));
attributes.putAll(writeResult.getAttributes());
}
if (recordCount > 0 || sendZeroResults) {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, null, baos.toByteArray());
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
throw new KafkaSendException(exception);
}
}).get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
} catch (KafkaSendException kse) {
Throwable t = kse.getCause();
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException(t);
}
} catch (final InterruptedException e) {
getLogger().warn("Interrupted while waiting for an acknowledgement from Kafka");
Thread.currentThread().interrupt();
} catch (final TimeoutException e) {
getLogger().warn("Timed out while waiting for an acknowledgement from Kafka");
if (recordCount == 0) {
if (sendZeroResults) {
sendMessage(topic, new byte[0], ackQ);
} else {
return WriteResult.EMPTY;
}
} else {
writeResult = WriteResult.EMPTY;
}
return writeResult;
acknowledgeTransmission(ackQ);
return WriteResult.of(recordCount, attributes);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
@ -285,6 +275,37 @@ public class KafkaRecordSink_2_6 extends AbstractControllerService implements Ka
}
}
private void sendMessage(String topic, byte[] payload, final Queue<Future<RecordMetadata>> ackQ) throws IOException {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, null, payload);
// Add the Future to the queue
ackQ.add(producer.send(record, (metadata, exception) -> {
if (exception != null) {
throw new KafkaSendException(exception);
}
}));
}
private void acknowledgeTransmission(final Queue<Future<RecordMetadata>> ackQ) throws IOException, ExecutionException {
try {
Future<RecordMetadata> ack;
while ((ack = ackQ.poll()) != null) {
ack.get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
}
} catch (KafkaSendException kse) {
Throwable t = kse.getCause();
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException(t);
}
} catch (final InterruptedException e) {
getLogger().warn("Interrupted while waiting for an acknowledgement from Kafka");
Thread.currentThread().interrupt();
} catch (final TimeoutException e) {
getLogger().warn("Timed out while waiting for an acknowledgement from Kafka");
}
}
@OnDisabled
public void stop() {
if (producer != null) {

View File

@ -95,14 +95,17 @@ public class TestKafkaRecordSink_2_6 {
task.sendData(recordSet, new HashMap<>(), true);
assertEquals(1, task.dataSent.size());
assertEquals(2, task.dataSent.size());
String[] lines = new String(task.dataSent.get(0)).split("\n");
assertNotNull(lines);
assertEquals(2, lines.length);
assertEquals(1, lines.length);
String[] data = lines[0].split(",");
assertEquals("15", data[0]); // In the MockRecordWriter all values are strings
assertEquals("Hello", data[1]);
data = lines[1].split(",");
lines = new String(task.dataSent.get(1)).split("\n");
assertNotNull(lines);
assertEquals(1, lines.length);
data = lines[0].split(",");
assertEquals("6", data[0]);
assertEquals("World!", data[1]);
}