NIFI-4724: Support 0 byte message with PublishKafka

Before this fix, PublishKafka (0.9) and PublishKafka_0_10 fail with empty incoming FlowFiles due to 'transfer relationship not specified' error.
Because the internal 'publish' method is not called as StreamDemarcator does not emit any token regardless whether demarcator is set or not.

As for PublishKafka_0_11 and PublishKafka_1_0, empty FlowFiles are transferred to 'success' relationship, however no Kafka message is sent to Kafka.

Since Kafka allows 0 byte body empty messages, NiFi should be able to send it, too.

This commit changes above current situation to the followings, with all PublishKafka_* processors:

- If demarcator is not set, then publish incoming FlowFile content as it is. This enables sending an empty Kafka message.
- If demarcator is set, send each token as a separate message.
  Even if no token is found (empty incoming FlowFile), transfer the FlowFile to 'success'.

This closes #2362.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Koji Kawamura 2017-12-27 17:38:57 +09:00 committed by Mark Payne
parent 3b10a8479e
commit e5ed62a98f
12 changed files with 299 additions and 12 deletions

View File

@ -42,6 +42,10 @@ public class InFlightMessageTracker {
}
}
/**
* This method guarantees that the specified FlowFile to be transferred to
* 'success' relationship even if it did not derive any Kafka message.
*/
public void trackEmpty(final FlowFile flowFile) {
messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
}

View File

@ -38,6 +38,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@ -71,9 +72,21 @@ public class PublisherLease implements Closeable {
tracker = new InFlightMessageTracker();
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
byte[] messageContent;
try {
byte[] messageContent;
if (demarcatorBytes == null || demarcatorBytes.length == 0) {
if (flowFile.getSize() > maxMessageSize) {
tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
return;
}
// Send FlowFile content as it is, to support sending 0 byte message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
publish(flowFile, messageKey, messageContent, topic, tracker);
return;
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
while ((messageContent = demarcator.nextToken()) != null) {
publish(flowFile, messageKey, messageContent, topic, tracker);
@ -82,6 +95,7 @@ public class PublisherLease implements Closeable {
return;
}
}
tracker.trackEmpty(flowFile);
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
}

View File

@ -81,7 +81,9 @@ public class TestPublisherLease {
}
};
final FlowFile flowFile = new MockFlowFile(1L);
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
Mockito.when(flowFile.getSize()).thenReturn(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
@ -205,6 +207,57 @@ public class TestPublisherLease {
verify(producer, times(1)).flush();
}
@Test
@SuppressWarnings("unchecked")
public void testZeroByteMessageSent() throws IOException {
final AtomicInteger poisonCount = new AtomicInteger(0);
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
@Override
protected void poison() {
poisonCount.incrementAndGet();
super.poison();
}
};
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
if ("".equals(valueString)) {
correctMessages.incrementAndGet();
} else {
incorrectMessages.incrementAndGet();
}
return null;
}
}).when(producer).send(any(ProducerRecord.class), any(Callback.class));
final FlowFile flowFile = new MockFlowFile(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
final byte[] flowFileContent = new byte[0];
lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
assertEquals(0, poisonCount.get());
verify(producer, times(0)).flush();
final PublishResult result = lease.complete();
assertEquals(1, correctMessages.get());
assertEquals(0, incorrectMessages.get());
verify(producer, times(1)).flush();
}
@Test
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger);

View File

@ -45,6 +45,10 @@ public class InFlightMessageTracker {
}
}
/**
* This method guarantees that the specified FlowFile to be transferred to
* 'success' relationship even if it did not derive any Kafka message.
*/
public void trackEmpty(final FlowFile flowFile) {
messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
}

View File

@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@ -113,9 +114,21 @@ public class PublisherLease implements Closeable {
tracker = new InFlightMessageTracker(logger);
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
byte[] messageContent;
try {
byte[] messageContent;
if (demarcatorBytes == null || demarcatorBytes.length == 0) {
if (flowFile.getSize() > maxMessageSize) {
tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
return;
}
// Send FlowFile content as it is, to support sending 0 byte message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
publish(flowFile, messageKey, messageContent, topic, tracker);
return;
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
while ((messageContent = demarcator.nextToken()) != null) {
publish(flowFile, messageKey, messageContent, topic, tracker);
@ -123,6 +136,7 @@ public class PublisherLease implements Closeable {
// If we have a failure, don't try to send anything else.
return;
}
tracker.trackEmpty(flowFile);
}
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);

View File

@ -80,7 +80,9 @@ public class TestPublisherLease {
}
};
final FlowFile flowFile = new MockFlowFile(1L);
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
Mockito.when(flowFile.getSize()).thenReturn(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
@ -201,6 +203,57 @@ public class TestPublisherLease {
verify(producer, times(1)).flush();
}
@Test
@SuppressWarnings("unchecked")
public void testZeroByteMessageSent() throws IOException {
final AtomicInteger poisonCount = new AtomicInteger(0);
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
@Override
protected void poison() {
poisonCount.incrementAndGet();
super.poison();
}
};
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
if ("".equals(valueString)) {
correctMessages.incrementAndGet();
} else {
incorrectMessages.incrementAndGet();
}
return null;
}
}).when(producer).send(any(ProducerRecord.class), any(Callback.class));
final FlowFile flowFile = new MockFlowFile(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
final byte[] flowFileContent = new byte[0];
lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
assertEquals(0, poisonCount.get());
verify(producer, times(0)).flush();
final PublishResult result = lease.complete();
assertEquals(1, correctMessages.get());
assertEquals(0, incorrectMessages.get());
verify(producer, times(1)).flush();
}
@Test
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);

View File

@ -42,6 +42,14 @@ public class InFlightMessageTracker {
}
}
/**
* This method guarantees that the specified FlowFile to be transferred to
* 'success' relationship even if it did not derive any Kafka message.
*/
public void trackEmpty(final FlowFile flowFile) {
messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
}
public int getAcknowledgedCount(final FlowFile flowFile) {
final Counts counter = messageCountsByFlowFile.get(flowFile);
return (counter == null) ? 0 : counter.getAcknowledgedCount();

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@ -61,9 +62,21 @@ public class PublisherLease implements Closeable {
tracker = new InFlightMessageTracker();
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
byte[] messageContent;
try {
byte[] messageContent;
if (demarcatorBytes == null || demarcatorBytes.length == 0) {
if (flowFile.getSize() > maxMessageSize) {
tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
return;
}
// Send FlowFile content as it is, to support sending 0 byte message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
publish(flowFile, messageKey, messageContent, topic, tracker);
return;
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
while ((messageContent = demarcator.nextToken()) != null) {
publish(flowFile, messageKey, messageContent, topic, tracker);
@ -72,6 +85,7 @@ public class PublisherLease implements Closeable {
return;
}
}
tracker.trackEmpty(flowFile);
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
}

View File

@ -68,7 +68,9 @@ public class TestPublisherLease {
}
};
final FlowFile flowFile = new MockFlowFile(1L);
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
Mockito.when(flowFile.getSize()).thenReturn(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
@ -191,4 +193,56 @@ public class TestPublisherLease {
verify(producer, times(1)).flush();
}
@Test
@SuppressWarnings("unchecked")
public void testZeroByteMessageSent() throws IOException {
final AtomicInteger poisonCount = new AtomicInteger(0);
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
@Override
protected void poison() {
poisonCount.incrementAndGet();
super.poison();
}
};
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
if ("".equals(valueString)) {
correctMessages.incrementAndGet();
} else {
incorrectMessages.incrementAndGet();
}
return null;
}
}).when(producer).send(any(ProducerRecord.class), any(Callback.class));
final FlowFile flowFile = new MockFlowFile(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
final byte[] flowFileContent = new byte[0];
lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
assertEquals(0, poisonCount.get());
verify(producer, times(0)).flush();
final PublishResult result = lease.complete();
assertEquals(1, correctMessages.get());
assertEquals(0, incorrectMessages.get());
verify(producer, times(1)).flush();
}
}

View File

@ -45,6 +45,10 @@ public class InFlightMessageTracker {
}
}
/**
* This method guarantees that the specified FlowFile to be transferred to
* 'success' relationship even if it did not derive any Kafka message.
*/
public void trackEmpty(final FlowFile flowFile) {
messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
}

View File

@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@ -113,9 +114,21 @@ public class PublisherLease implements Closeable {
tracker = new InFlightMessageTracker(logger);
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
byte[] messageContent;
try {
byte[] messageContent;
if (demarcatorBytes == null || demarcatorBytes.length == 0) {
if (flowFile.getSize() > maxMessageSize) {
tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
return;
}
// Send FlowFile content as it is, to support sending 0 byte message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
publish(flowFile, messageKey, messageContent, topic, tracker);
return;
}
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
while ((messageContent = demarcator.nextToken()) != null) {
publish(flowFile, messageKey, messageContent, topic, tracker);

View File

@ -79,7 +79,9 @@ public class TestPublisherLease {
}
};
final FlowFile flowFile = new MockFlowFile(1L);
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
Mockito.when(flowFile.getSize()).thenReturn(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
@ -200,6 +202,56 @@ public class TestPublisherLease {
verify(producer, times(1)).flush();
}
@Test
@SuppressWarnings("unchecked")
public void testZeroByteMessageSent() throws IOException {
final AtomicInteger poisonCount = new AtomicInteger(0);
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
@Override
protected void poison() {
poisonCount.incrementAndGet();
super.poison();
}
};
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
if ("".equals(valueString)) {
correctMessages.incrementAndGet();
} else {
incorrectMessages.incrementAndGet();
}
return null;
}
}).when(producer).send(any(ProducerRecord.class), any(Callback.class));
final FlowFile flowFile = new MockFlowFile(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
final byte[] demarcatorBytes = null;
final byte[] flowFileContent = new byte[0];
lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
assertEquals(0, poisonCount.get());
verify(producer, times(0)).flush();
final PublishResult result = lease.complete();
assertEquals(1, correctMessages.get());
assertEquals(0, incorrectMessages.get());
verify(producer, times(1)).flush();
}
@Test
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {