diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
deleted file mode 100644
index 7024f34ccf..0000000000
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.stream.io.exception;
-
-import java.io.IOException;
-
-public class TokenTooLargeException extends IOException {
- public TokenTooLargeException(final String message) {
- super(message);
- }
-}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
index dc3d829aa7..3064f1c4a8 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
@@ -16,12 +16,9 @@
*/
package org.apache.nifi.stream.io.util;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.nifi.stream.io.exception.TokenTooLargeException;
-
/**
* The StreamDemarcator class takes an input stream and demarcates
* it so it could be read (see {@link #nextToken()}) as individual byte[]
@@ -29,7 +26,7 @@ import org.apache.nifi.stream.io.exception.TokenTooLargeException;
* stream will be read into a single token which may result in
* {@link OutOfMemoryError} if stream is too large.
*/
-public class StreamDemarcator implements Closeable {
+public class StreamDemarcator {
private final static int INIT_BUFFER_SIZE = 8192;
@@ -98,10 +95,8 @@ public class StreamDemarcator implements Closeable {
/**
* Will read the next data token from the {@link InputStream} returning null
* when it reaches the end of the stream.
- *
- * @throws IOException if unable to read from the stream
*/
- public byte[] nextToken() throws IOException {
+ public byte[] nextToken() {
byte[] data = null;
int j = 0;
@@ -131,10 +126,8 @@ public class StreamDemarcator implements Closeable {
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary
- *
- * @throws IOException if unable to read from the stream
*/
- private void fill() throws IOException {
+ private void fill() {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
@@ -148,16 +141,20 @@ public class StreamDemarcator implements Closeable {
}
}
- int bytesRead;
- do {
- bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
- } while (bytesRead == 0);
+ try {
+ int bytesRead;
+ do {
+ bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
+ } while (bytesRead == 0);
- if (bytesRead != -1) {
- this.readAheadLength = this.index + bytesRead;
- if (this.readAheadLength > this.maxDataSize) {
- throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
+ if (bytesRead != -1) {
+ this.readAheadLength = this.index + bytesRead;
+ if (this.readAheadLength > this.maxDataSize) {
+ throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");
+ }
}
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed while reading InputStream", e);
}
}
@@ -191,9 +188,4 @@ public class StreamDemarcator implements Closeable {
throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
}
}
-
- @Override
- public void close() throws IOException {
- is.close();
- }
}
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
index 66d266848a..93082a2b70 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -66,7 +65,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateNoDelimiter() throws IOException {
+ public void validateNoDelimiter() {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -77,7 +76,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateNoDelimiterSmallInitialBuffer() throws IOException {
+ public void validateNoDelimiterSmallInitialBuffer() {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 1);
@@ -85,7 +84,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateSingleByteDelimiter() throws IOException {
+ public void validateSingleByteDelimiter() {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -96,7 +95,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateDelimiterAtTheBeginning() throws IOException {
+ public void validateDelimiterAtTheBeginning() {
String data = ",Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -107,7 +106,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateEmptyDelimiterSegments() throws IOException {
+ public void validateEmptyDelimiterSegments() {
String data = ",,,,,Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -118,7 +117,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateSingleByteDelimiterSmallInitialBuffer() throws IOException {
+ public void validateSingleByteDelimiterSmallInitialBuffer() {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000, 2);
@@ -129,7 +128,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteDelimiter() throws IOException {
+ public void validateWithMultiByteDelimiter() {
String data = "foodaabardaabazzz";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -140,7 +139,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteDelimiterAtTheBeginning() throws IOException {
+ public void validateWithMultiByteDelimiterAtTheBeginning() {
String data = "daafoodaabardaabazzz";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -151,7 +150,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteDelimiterSmallInitialBuffer() throws IOException {
+ public void validateWithMultiByteDelimiterSmallInitialBuffer() {
String data = "foodaabarffdaabazz";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000, 1);
@@ -162,7 +161,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteCharsNoDelimiter() throws IOException {
+ public void validateWithMultiByteCharsNoDelimiter() {
String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -173,7 +172,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() throws IOException {
+ public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() {
String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 2);
@@ -184,7 +183,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithComplexDelimiter() throws IOException {
+ public void validateWithComplexDelimiter() {
String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
StreamDemarcator scanner = new StreamDemarcator(is, "".getBytes(StandardCharsets.UTF_8), 1000);
@@ -194,8 +193,8 @@ public class StreamDemarcatorTest {
assertNull(scanner.nextToken());
}
- @Test(expected = IOException.class)
- public void validateMaxBufferSize() throws IOException {
+ @Test(expected = IllegalStateException.class)
+ public void validateMaxBufferSize() {
String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
StreamDemarcator scanner = new StreamDemarcator(is, "".getBytes(StandardCharsets.UTF_8), 20);
@@ -203,7 +202,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() throws IOException {
+ public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
StreamDemarcator scanner = new StreamDemarcator(is, null, 20);
byte[] b = scanner.nextToken();
@@ -211,7 +210,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateScannerHandlesNegativeOneByteInputs() throws IOException {
+ public void validateScannerHandlesNegativeOneByteInputs() {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
StreamDemarcator scanner = new StreamDemarcator(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024);
byte[] b = scanner.nextToken();
@@ -219,59 +218,10 @@ public class StreamDemarcatorTest {
}
@Test
- public void verifyScannerHandlesNegativeOneByteDelimiter() throws IOException {
+ public void verifyScannerHandlesNegativeOneByteDelimiter() {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 20, 1024);
assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0, 0 });
assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0 });
}
-
- @Test
- public void testWithoutTrailingDelimiter() throws IOException {
- final byte[] inputData = "Larger Message First\nSmall".getBytes(StandardCharsets.UTF_8);
- ByteArrayInputStream is = new ByteArrayInputStream(inputData);
- StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000);
-
- final byte[] first = scanner.nextToken();
- final byte[] second = scanner.nextToken();
- assertNotNull(first);
- assertNotNull(second);
-
- assertEquals("Larger Message First", new String(first, StandardCharsets.UTF_8));
- assertEquals("Small", new String(second, StandardCharsets.UTF_8));
- }
-
- @Test
- public void testOnBufferSplitNoTrailingDelimiter() throws IOException {
- final byte[] inputData = "Yes\nNo".getBytes(StandardCharsets.UTF_8);
- ByteArrayInputStream is = new ByteArrayInputStream(inputData);
- StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
-
- final byte[] first = scanner.nextToken();
- final byte[] second = scanner.nextToken();
- assertNotNull(first);
- assertNotNull(second);
-
- assertArrayEquals(first, new byte[] {'Y', 'e', 's'});
- assertArrayEquals(second, new byte[] {'N', 'o'});
- }
-
- @Test
- public void testOnBufferSplit() throws IOException {
- final byte[] inputData = "123\n456\n789".getBytes(StandardCharsets.UTF_8);
- ByteArrayInputStream is = new ByteArrayInputStream(inputData);
- StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
-
- final byte[] first = scanner.nextToken();
- final byte[] second = scanner.nextToken();
- final byte[] third = scanner.nextToken();
- assertNotNull(first);
- assertNotNull(second);
- assertNotNull(third);
-
- assertArrayEquals(first, new byte[] {'1', '2', '3'});
- assertArrayEquals(second, new byte[] {'4', '5', '6'});
- assertArrayEquals(third, new byte[] {'7', '8', '9'});
- }
-
}
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7c0cc0f202..2a1451ab6d 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -369,55 +369,54 @@ public class StandardProcessorTestRunner implements TestRunner {
}
@Override
- public MockFlowFile enqueue(final Path path) throws IOException {
- return enqueue(path, new HashMap());
+ public void enqueue(final Path path) throws IOException {
+ enqueue(path, new HashMap());
}
@Override
- public MockFlowFile enqueue(final Path path, final Map attributes) throws IOException {
+ public void enqueue(final Path path, final Map attributes) throws IOException {
final Map modifiedAttributes = new HashMap<>(attributes);
if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName());
}
try (final InputStream in = Files.newInputStream(path)) {
- return enqueue(in, modifiedAttributes);
+ enqueue(in, modifiedAttributes);
}
}
@Override
- public MockFlowFile enqueue(final byte[] data) {
- return enqueue(data, new HashMap());
+ public void enqueue(final byte[] data) {
+ enqueue(data, new HashMap());
}
@Override
- public MockFlowFile enqueue(final String data) {
- return enqueue(data.getBytes(StandardCharsets.UTF_8), Collections. emptyMap());
+ public void enqueue(final String data) {
+ enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap());
}
@Override
- public MockFlowFile enqueue(final byte[] data, final Map attributes) {
- return enqueue(new ByteArrayInputStream(data), attributes);
+ public void enqueue(final byte[] data, final Map attributes) {
+ enqueue(new ByteArrayInputStream(data), attributes);
}
@Override
- public MockFlowFile enqueue(final String data, final Map attributes) {
- return enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
+ public void enqueue(final String data, final Map attributes) {
+ enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
}
@Override
- public MockFlowFile enqueue(final InputStream data) {
- return enqueue(data, new HashMap());
+ public void enqueue(final InputStream data) {
+ enqueue(data, new HashMap());
}
@Override
- public MockFlowFile enqueue(final InputStream data, final Map attributes) {
+ public void enqueue(final InputStream data, final Map attributes) {
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor);
MockFlowFile flowFile = session.create();
flowFile = session.importFrom(data, flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);
enqueue(flowFile);
- return flowFile;
}
@Override
@@ -879,20 +878,17 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void assertAllConditionsMet(final Relationship relationship, Predicate predicate) {
- if (predicate==null) {
+ if (predicate==null)
Assert.fail("predicate cannot be null");
- }
final List flowFiles = getFlowFilesForRelationship(relationship);
- if (flowFiles.isEmpty()) {
+ if (flowFiles.isEmpty())
Assert.fail("Relationship " + relationship.getName() + " does not contain any FlowFile");
- }
for (MockFlowFile flowFile : flowFiles) {
- if (predicate.test(flowFile)==false) {
+ if (predicate.test(flowFile)==false)
Assert.fail("FlowFile " + flowFile + " does not meet all condition");
- }
}
}
}
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 1c014c3f59..05124160d7 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -383,7 +383,7 @@ public interface TestRunner {
* @param path to read content from
* @throws IOException if unable to read content
*/
- MockFlowFile enqueue(Path path) throws IOException;
+ void enqueue(Path path) throws IOException;
/**
* Reads the content from the given {@link Path} into memory and creates a
@@ -394,7 +394,7 @@ public interface TestRunner {
* @param attributes attributes to use for new flow file
* @throws IOException if unable to read content
*/
- MockFlowFile enqueue(Path path, Map attributes) throws IOException;
+ void enqueue(Path path, Map attributes) throws IOException;
/**
* Copies the content from the given byte array into memory and creates a
@@ -403,7 +403,7 @@ public interface TestRunner {
*
* @param data to enqueue
*/
- MockFlowFile enqueue(byte[] data);
+ void enqueue(byte[] data);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes,
@@ -411,7 +411,7 @@ public interface TestRunner {
*
* @param data to enqueue
*/
- MockFlowFile enqueue(String data);
+ void enqueue(String data);
/**
* Copies the content from the given byte array into memory and creates a
@@ -421,7 +421,7 @@ public interface TestRunner {
* @param data to enqueue
* @param attributes to use for enqueued item
*/
- MockFlowFile enqueue(byte[] data, Map attributes);
+ void enqueue(byte[] data, Map attributes);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes,
@@ -430,7 +430,7 @@ public interface TestRunner {
* @param data to enqueue
* @param attributes to use for enqueued item
*/
- MockFlowFile enqueue(String data, Map attributes);
+ void enqueue(String data, Map attributes);
/**
* Reads the content from the given {@link InputStream} into memory and
@@ -439,7 +439,7 @@ public interface TestRunner {
*
* @param data to source data from
*/
- MockFlowFile enqueue(InputStream data);
+ void enqueue(InputStream data);
/**
* Reads the content from the given {@link InputStream} into memory and
@@ -449,7 +449,7 @@ public interface TestRunner {
* @param data source of data
* @param attributes to use for flow files
*/
- MockFlowFile enqueue(InputStream data, Map attributes);
+ void enqueue(InputStream data, Map attributes);
/**
* Copies the contents of the given {@link MockFlowFile} into a byte array
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 8ca3494a20..b061fcc67b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -223,7 +223,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
- final Map props = new HashMap<>();
+ final Map props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index baacdc7619..fba8cb57b5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -74,7 +74,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
- final Map kafkaProperties,
+ final Map kafkaProperties,
final List topics,
final long maxWaitMillis,
final String keyEncoding,
@@ -148,7 +148,7 @@ public class ConsumerPool implements Closeable {
});
}
- private void closeConsumer(final Consumer, ?> consumer) {
+ private void closeConsumer(final Consumer consumer) {
consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
deleted file mode 100644
index e7d5cb7163..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.flowfile.FlowFile;
-
-public class InFlightMessageTracker {
- private final ConcurrentMap messageCountsByFlowFile = new ConcurrentHashMap<>();
- private final ConcurrentMap failures = new ConcurrentHashMap<>();
- private final Object progressMutex = new Object();
-
- public void incrementAcknowledgedCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
- counter.incrementAcknowledgedCount();
-
- synchronized (progressMutex) {
- progressMutex.notify();
- }
- }
-
- public int getAcknowledgedCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.get(flowFile);
- return (counter == null) ? 0 : counter.getAcknowledgedCount();
- }
-
- public void incrementSentCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
- counter.incrementSentCount();
- }
-
- public int getSentCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.get(flowFile);
- return (counter == null) ? 0 : counter.getSentCount();
- }
-
- public void fail(final FlowFile flowFile, final Exception exception) {
- failures.putIfAbsent(flowFile, exception);
-
- synchronized (progressMutex) {
- progressMutex.notify();
- }
- }
-
- public Exception getFailure(final FlowFile flowFile) {
- return failures.get(flowFile);
- }
-
- public boolean isFailed(final FlowFile flowFile) {
- return getFailure(flowFile) != null;
- }
-
- public void reset() {
- messageCountsByFlowFile.clear();
- failures.clear();
- }
-
- public PublishResult failOutstanding(final Exception exception) {
- messageCountsByFlowFile.keySet().stream()
- .filter(ff -> !isComplete(ff))
- .filter(ff -> !failures.containsKey(ff))
- .forEach(ff -> failures.put(ff, exception));
-
- return createPublishResult();
- }
-
- private boolean isComplete(final FlowFile flowFile) {
- final Counts counts = messageCountsByFlowFile.get(flowFile);
- if (counts.getAcknowledgedCount() == counts.getSentCount()) {
- // all messages received successfully.
- return true;
- }
-
- if (failures.containsKey(flowFile)) {
- // FlowFile failed so is complete
- return true;
- }
-
- return false;
- }
-
- private boolean isComplete() {
- return messageCountsByFlowFile.keySet().stream()
- .allMatch(flowFile -> isComplete(flowFile));
- }
-
- void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
- final long startTime = System.nanoTime();
- final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
-
- while (System.nanoTime() < maxTime) {
- synchronized (progressMutex) {
- if (isComplete()) {
- return;
- }
-
- progressMutex.wait(millis);
- }
- }
-
- throw new TimeoutException();
- }
-
-
- PublishResult createPublishResult() {
- return new PublishResult() {
- @Override
- public Collection getSuccessfulFlowFiles() {
- if (failures.isEmpty()) {
- return messageCountsByFlowFile.keySet();
- }
-
- final Set flowFiles = new HashSet<>(messageCountsByFlowFile.keySet());
- flowFiles.removeAll(failures.keySet());
- return flowFiles;
- }
-
- @Override
- public Collection getFailedFlowFiles() {
- return failures.keySet();
- }
-
- @Override
- public int getSuccessfulMessageCount(final FlowFile flowFile) {
- return getAcknowledgedCount(flowFile);
- }
-
- @Override
- public Exception getReasonForFailure(final FlowFile flowFile) {
- return getFailure(flowFile);
- }
- };
- }
-
- public static class Counts {
- private final AtomicInteger sentCount = new AtomicInteger(0);
- private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
-
- public void incrementSentCount() {
- sentCount.incrementAndGet();
- }
-
- public void incrementAcknowledgedCount() {
- acknowledgedCount.incrementAndGet();
- }
-
- public int getAcknowledgedCount() {
- return acknowledgedCount.get();
- }
-
- public int getSentCount() {
- return sentCount.get();
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 3d09f2df03..707a4314ab 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -27,9 +27,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
@@ -187,7 +186,7 @@ final class KafkaProcessorUtils {
final Class> classType;
- public KafkaConfigValidator(final Class> classType) {
+ public KafkaConfigValidator(final Class classType) {
this.classType = classType;
}
@@ -212,8 +211,7 @@ final class KafkaProcessorUtils {
return builder.toString();
}
-
- static void buildCommonKafkaProperties(final ProcessContext context, final Class> kafkaConfigClass, final Map mapToPopulate) {
+ static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map mapToPopulate) {
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
// Translate SSLContext Service configuration into Kafka properties
@@ -232,33 +230,28 @@ final class KafkaProcessorUtils {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
-
- String propertyName = propertyDescriptor.getName();
- String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
+ String pName = propertyDescriptor.getName();
+ String pValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
-
- if (propertyValue != null) {
- // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
- // or the standard NiFi time period such as "5 secs"
- if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
- propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
+ if (pValue != null) {
+ if (pName.endsWith(".ms")) { // kafka standard time notation
+ pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS));
}
-
- if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
- mapToPopulate.put(propertyName, propertyValue);
+ if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+ mapToPopulate.put(pName, pValue);
}
}
}
}
- private static boolean isStaticStringFieldNamePresent(final String name, final Class>... classes) {
+ private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) {
return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
}
- private static Set getPublicStaticStringFieldValues(final Class>... classes) {
+ private static Set getPublicStaticStringFieldValues(final Class... classes) {
final Set strings = new HashSet<>();
- for (final Class> classType : classes) {
+ for (final Class classType : classes) {
for (final Field field : classType.getDeclaredFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
new file mode 100644
index 0000000000..31a084f133
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
+ * with sending contents of the {@link FlowFile}s to Kafka.
+ */
+class KafkaPublisher implements Closeable {
+
+ private final Producer kafkaProducer;
+
+ private volatile long ackWaitTime = 30000;
+
+ private final ComponentLog componentLog;
+
+ private final int ackCheckSize;
+
+ KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+ this(kafkaProperties, 100, componentLog);
+ }
+
+ /**
+ * Creates an instance of this class as well as the instance of the
+ * corresponding Kafka {@link KafkaProducer} using provided Kafka
+ * configuration properties.
+ *
+ * @param kafkaProperties instance of {@link Properties} used to bootstrap
+ * {@link KafkaProducer}
+ */
+ KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
+ this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+ this.ackCheckSize = ackCheckSize;
+ this.componentLog = componentLog;
+ }
+
+ /**
+ * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
+ * determine how many messages to Kafka will be sent from a provided
+ * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
+ * It supports two publishing modes:
+ *
+ *
Sending all messages constructed from
+ * {@link StreamDemarcator#nextToken()} operation.
+ *
Sending only unacknowledged messages constructed from
+ * {@link StreamDemarcator#nextToken()} operation.
+ *
+ * The unacknowledged messages are determined from the value of
+ * {@link PublishingContext#getLastAckedMessageIndex()}.
+ *
+ * This method assumes content stream affinity where it is expected that the
+ * content stream that represents the same Kafka message(s) will remain the
+ * same across possible retries. This is required specifically for cases
+ * where delimiter is used and a single content stream may represent
+ * multiple Kafka messages. The
+ * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
+ * index of the last ACKed message, so upon retry only messages with the
+ * higher index are sent.
+ *
+ * @param publishingContext instance of {@link PublishingContext} which hold
+ * context information about the message(s) to be sent.
+ * @return The index of the last successful offset.
+ */
+ KafkaPublisherResult publish(PublishingContext publishingContext) {
+ StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
+ publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
+
+ int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
+ List> resultFutures = new ArrayList<>();
+
+ byte[] messageBytes;
+ int tokenCounter = 0;
+ boolean continueSending = true;
+ KafkaPublisherResult result = null;
+ for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
+ if (prevLastAckedMessageIndex < tokenCounter) {
+ ProducerRecord message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
+ resultFutures.add(this.kafkaProducer.send(message));
+
+ if (tokenCounter % this.ackCheckSize == 0) {
+ int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+ resultFutures.clear();
+ if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+ continueSending = false;
+ result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ }
+ prevLastAckedMessageIndex = lastAckedMessageIndex;
+ }
+ }
+ }
+
+ if (result == null) {
+ int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+ resultFutures.clear();
+ result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ }
+ return result;
+ }
+
+ /**
+ * Sets the time this publisher will wait for the {@link Future#get()}
+ * operation (the Future returned by
+ * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
+ * out.
+ *
+ * This value will also be used as a timeout when closing the underlying
+ * {@link KafkaProducer}. See {@link #close()}.
+ */
+ void setAckWaitTime(long ackWaitTime) {
+ this.ackWaitTime = ackWaitTime;
+ }
+
+ /**
+ * This operation will process ACKs from Kafka in the order in which
+ * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning
+ * the index of the last ACKed message. Within this operation processing ACK
+ * simply means successful invocation of 'get()' operation on the
+ * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
+ * operation. Upon encountering any type of error while interrogating such
+ * {@link Future} the ACK loop will end. Messages that were not ACKed would
+ * be considered non-delivered and therefore could be resent at the later
+ * time.
+ *
+ * @param sendFutures list of {@link Future}s representing results of
+ * publishing to Kafka
+ *
+ * @param lastAckMessageIndex the index of the last ACKed message. It is
+ * important to provide the last ACKed message especially while re-trying so
+ * the proper index is maintained.
+ */
+ private int processAcks(List> sendFutures, int lastAckMessageIndex) {
+ boolean exceptionThrown = false;
+ for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) {
+ Future future = sendFutures.get(segmentCounter);
+ try {
+ future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+ lastAckMessageIndex++;
+ } catch (InterruptedException e) {
+ exceptionThrown = true;
+ Thread.currentThread().interrupt();
+ this.warnOrError("Interrupted while waiting for acks from Kafka", null);
+ } catch (ExecutionException e) {
+ exceptionThrown = true;
+ this.warnOrError("Failed while waiting for acks from Kafka", e);
+ } catch (TimeoutException e) {
+ exceptionThrown = true;
+ this.warnOrError("Timed out while waiting for acks from Kafka", null);
+ }
+ }
+
+ return lastAckMessageIndex;
+ }
+
+ /**
+ * Will close the underlying {@link KafkaProducer} waiting if necessary for
+ * the same duration as supplied {@link #setAckWaitTime(long)}
+ */
+ @Override
+ public void close() {
+ this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ *
+ */
+ private void warnOrError(String message, Exception e) {
+ if (e == null) {
+ this.componentLog.warn(message);
+ } else {
+ this.componentLog.error(message, e);
+ }
+ }
+
+ /**
+ * Encapsulates the result received from publishing messages to Kafka
+ */
+ static class KafkaPublisherResult {
+
+ private final int messagesSent;
+ private final int lastMessageAcked;
+
+ KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
+ this.messagesSent = messagesSent;
+ this.lastMessageAcked = lastMessageAcked;
+ }
+
+ public int getMessagesSent() {
+ return this.messagesSent;
+ }
+
+ public int getLastMessageAcked() {
+ return this.lastMessageAcked;
+ }
+
+ public boolean isAllAcked() {
+ return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
+ }
+
+ @Override
+ public String toString() {
+ return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
index ecfe730d38..83688b7ade 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.kafka.pubsub;
-import java.io.BufferedInputStream;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
@@ -28,16 +27,17 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.xml.bind.DatatypeConverter;
-
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -46,192 +46,200 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.9.x"})
-@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.9 producer. "
- + "The messages to send may be individual FlowFiles or may be delimited, using a "
- + "user-specified delimiter, such as a new-line. "
- + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
- + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time"
- + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.10"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.10 producer. "
+ + "The messages to send may be individual FlowFiles or may be delimited, using a "
+ + "user-specified delimiter, such as a new-line. "
+ + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time"
+ + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
- description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
-@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
- + "FlowFiles that are routed to success. If the Property is not set, this will always be 1, but if the Property is set, it may "
- + "be greater than 1.")
-public class PublishKafka_0_10 extends AbstractProcessor {
+public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
+
+ protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
+
+ protected static final String FAILED_TOPIC_ATTR = "failed.topic";
+
+ protected static final String FAILED_KEY_ATTR = "failed.key";
+
+ protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
+
protected static final String MSG_COUNT = "msg.count";
static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
- "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+ "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+ "number of Kafka Nodes according to the Topic configuration");
static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
- "FlowFile will be routed to success if the message is received by a single Kafka node, "
+ "FlowFile will be routed to success if the message is received by a single Kafka node, "
+ "whether or not it is replicated. This is faster than "
+ "but can result in data loss if a Kafka node crashes");
static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
- "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+ "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+ "without waiting for a response. This provides the best performance but may result in data loss.");
static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
- Partitioners.RoundRobinPartitioner.class.getSimpleName(),
- "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+ "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ "the next Partition to Partition 2, and so on, wrapping as necessary.");
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
- "DefaultPartitioner", "Messages will be assigned to random partitions.");
-
- static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
- static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
- "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+ "DefaultPartitioner", "Messages will be assigned to random partitions.");
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
- .name("topic")
- .displayName("Topic Name")
- .description("The name of the Kafka Topic to publish to.")
- .required(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("topic")
+ .displayName("Topic Name")
+ .description("The name of the Kafka Topic to publish to.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
- .name(ProducerConfig.ACKS_CONFIG)
- .displayName("Delivery Guarantee")
- .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
- .required(true)
- .expressionLanguageSupported(false)
- .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
- .defaultValue(DELIVERY_BEST_EFFORT.getValue())
- .build();
+ .name(ProducerConfig.ACKS_CONFIG)
+ .displayName("Delivery Guarantee")
+ .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+ .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+ .build();
- static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
- .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
- .displayName("Max Metadata Wait Time")
- .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
- + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(true)
- .defaultValue("5 sec")
- .build();
-
- static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
- .name("ack.wait.time")
- .displayName("Acknowledgment Wait Time")
- .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
- + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
- .required(true)
- .defaultValue("5 secs")
- .build();
+ static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder()
+ .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
+ .displayName("Meta Data Wait Time")
+ .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the "
+ + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("30 sec")
+ .build();
static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
- .name("max.request.size")
- .displayName("Max Request Size")
- .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
- .required(true)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("1 MB")
- .build();
+ .name("max.request.size")
+ .displayName("Max Request Size")
+ .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
- .name("kafka-key")
- .displayName("Kafka Key")
- .description("The Key to use for the Message. "
- + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
- + "and we're not demarcating.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("kafka-key")
+ .displayName("Kafka Key")
+ .description("The Key to use for the Message. It will be serialized as UTF-8 bytes. "
+ + "If not specified then the flow file attribute kafka.key is used if present "
+ + "and we're not demarcating. In that case the hex string is coverted to its byte"
+ + "form and written as a byte[] key.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
- .name("key-attribute-encoding")
- .displayName("Key Attribute Encoding")
- .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
- .required(true)
- .defaultValue(UTF8_ENCODING.getValue())
- .allowableValues(UTF8_ENCODING, HEX_ENCODING)
- .build();
+ .name("key-attribute-encoding")
+ .displayName("Key Attribute Encoding")
+ .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+ .required(true)
+ .defaultValue(UTF8_ENCODING.getValue())
+ .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+ .build();
static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
- .name("message-demarcator")
- .displayName("Message Demarcator")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
- + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
- + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
- + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.")
- .build();
+ .name("message-demarcator")
+ .displayName("Message Demarcator")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+ + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+ + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
+ + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on your OS.")
+ .build();
static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
- .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
- .displayName("Partitioner class")
- .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
- .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
- .defaultValue(RANDOM_PARTITIONING.getValue())
- .required(false)
- .build();
+ .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
+ .displayName("Partitioner class")
+ .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .defaultValue(RANDOM_PARTITIONING.getValue())
+ .required(false)
+ .build();
static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
- .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
- .displayName("Compression Type")
- .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .allowableValues("none", "gzip", "snappy", "lz4")
- .defaultValue("none")
- .build();
+ .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
+ .displayName("Compression Type")
+ .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("none", "gzip", "snappy", "lz4")
+ .defaultValue("none")
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("FlowFiles for which all content was sent to Kafka.")
- .build();
+ .name("success")
+ .description("FlowFiles for which all content was sent to Kafka.")
+ .build();
static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
- .build();
+ .name("failure")
+ .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+ .build();
- private static final List PROPERTIES;
- private static final Set RELATIONSHIPS;
+ static final List DESCRIPTORS;
- private volatile PublisherPool publisherPool = null;
+ static final Set RELATIONSHIPS;
+ private volatile String brokers;
+
+ private final AtomicInteger taskCounter = new AtomicInteger();
+
+ private volatile boolean acceptTask = true;
+
+ /*
+ * Will ensure that list of PropertyDescriptors is build only once, since
+ * all other lifecycle methods are invoked multiple times.
+ */
static {
- final List properties = new ArrayList<>();
- properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
- properties.add(TOPIC);
- properties.add(DELIVERY_GUARANTEE);
- properties.add(KEY);
- properties.add(KEY_ATTRIBUTE_ENCODING);
- properties.add(MESSAGE_DEMARCATOR);
- properties.add(MAX_REQUEST_SIZE);
- properties.add(ACK_WAIT_TIME);
- properties.add(METADATA_WAIT_TIME);
- properties.add(PARTITION_CLASS);
- properties.add(COMPRESSION_CODEC);
+ final List _descriptors = new ArrayList<>();
+ _descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+ _descriptors.add(TOPIC);
+ _descriptors.add(DELIVERY_GUARANTEE);
+ _descriptors.add(KEY);
+ _descriptors.add(KEY_ATTRIBUTE_ENCODING);
+ _descriptors.add(MESSAGE_DEMARCATOR);
+ _descriptors.add(MAX_REQUEST_SIZE);
+ _descriptors.add(META_WAIT_TIME);
+ _descriptors.add(PARTITION_CLASS);
+ _descriptors.add(COMPRESSION_CODEC);
- PROPERTIES = Collections.unmodifiableList(properties);
+ DESCRIPTORS = Collections.unmodifiableList(_descriptors);
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ final Set _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
}
@Override
@@ -241,17 +249,15 @@ public class PublishKafka_0_10 extends AbstractProcessor {
@Override
protected List getSupportedPropertyDescriptors() {
- return PROPERTIES;
+ return DESCRIPTORS;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
- .name(propertyDescriptorName)
- .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
- .dynamic(true)
- .build();
+ .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+ .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
+ .build();
}
@Override
@@ -259,123 +265,226 @@ public class PublishKafka_0_10 extends AbstractProcessor {
return KafkaProcessorUtils.validateCommonProperties(validationContext);
}
- private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
- PublisherPool pool = publisherPool;
- if (pool != null) {
- return pool;
- }
-
- return publisherPool = createPublisherPool(context);
- }
-
- protected PublisherPool createPublisherPool(final ProcessContext context) {
- final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
- final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
-
- final Map kafkaProperties = new HashMap<>();
- KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
- kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
-
- return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis);
- }
-
- @OnStopped
- public void closePool() {
- if (publisherPool != null) {
- publisherPool.close();
- }
-
- publisherPool = null;
- }
+ volatile KafkaPublisher kafkaPublisher;
+ /**
+ * This thread-safe operation will delegate to
+ * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
+ * checking and creating (if necessary) Kafka resource which could be either
+ * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
+ * destroy the underlying Kafka resource upon catching an {@link Exception}
+ * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
+ * After Kafka resource is destroyed it will be re-created upon the next
+ * invocation of this operation essentially providing a self healing
+ * mechanism to deal with potentially corrupted resource.
+ *
+ * Keep in mind that upon catching an exception the state of this processor
+ * will be set to no longer accept any more tasks, until Kafka resource is
+ * reset. This means that in a multi-threaded situation currently executing
+ * tasks will be given a chance to complete while no new tasks will be
+ * accepted.
+ *
+ * @param context context
+ * @param sessionFactory factory
+ */
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet();
-
- final List flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
- if (flowFiles.isEmpty()) {
- return;
- }
-
- final PublisherPool pool = getPublisherPool(context);
- if (pool == null) {
- context.yield();
- return;
- }
-
- final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
- final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-
- final long startTime = System.nanoTime();
- try (final PublisherLease lease = pool.obtainPublisher()) {
- // Send each FlowFile to Kafka asynchronously.
- for (final FlowFile flowFile : flowFiles) {
- if (!isScheduled()) {
- // If stopped, re-queue FlowFile instead of sending it
- session.transfer(flowFile);
- continue;
- }
-
- final byte[] messageKey = getMessageKey(flowFile, context);
- final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- final byte[] demarcatorBytes;
- if (useDemarcator) {
- demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
- } else {
- demarcatorBytes = null;
- }
-
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws IOException {
- try (final InputStream in = new BufferedInputStream(rawIn)) {
- lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
- }
+ public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted.
+ this.taskCounter.incrementAndGet();
+ final ProcessSession session = sessionFactory.createSession();
+ try {
+ /*
+ * We can't be doing double null check here since as a pattern
+ * it only works for lazy init but not reset, which is what we
+ * are doing here. In fact the first null check is dangerous
+ * since 'kafkaPublisher' can become null right after its null
+ * check passed causing subsequent NPE.
+ */
+ synchronized (this) {
+ if (this.kafkaPublisher == null) {
+ this.kafkaPublisher = this.buildKafkaResource(context, session);
}
- });
- }
-
- // Complete the send
- final PublishResult publishResult = lease.complete();
-
- // Transfer any successful FlowFiles.
- final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
- for (FlowFile success : publishResult.getSuccessfulFlowFiles()) {
- final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
-
- final int msgCount = publishResult.getSuccessfulMessageCount(success);
- success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
- session.adjustCounter("Messages Sent", msgCount, true);
-
- final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
- session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
- session.transfer(success, REL_SUCCESS);
- }
-
- // Transfer any failures.
- for (final FlowFile failure : publishResult.getFailedFlowFiles()) {
- final int successCount = publishResult.getSuccessfulMessageCount(failure);
- if (successCount > 0) {
- getLogger().error("Failed to send some messages for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}",
- new Object[] {failure, successCount, publishResult.getReasonForFailure(failure)});
- } else {
- getLogger().error("Failed to send all message for {} to Kafka; routing to failure due to {}",
- new Object[] {failure, publishResult.getReasonForFailure(failure)});
}
- session.transfer(failure, REL_FAILURE);
+ /*
+ * The 'processed' boolean flag does not imply any failure or success. It simply states that:
+ * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated
+ * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile
+ */
+ boolean processed = this.rendezvousWithKafka(context, session);
+ session.commit();
+ if (!processed) {
+ context.yield();
+ }
+ } catch (Throwable e) {
+ this.acceptTask = false;
+ session.rollback(true);
+ this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e});
+ } finally {
+ synchronized (this) {
+ if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
+ this.close();
+ this.acceptTask = true;
+ }
+ }
}
+ } else {
+ this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
+ this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
+ context.yield();
}
}
+ /**
+ * Will call {@link Closeable#close()} on the target resource after which
+ * the target resource will be set to null. Should only be called when there
+ * are no more threads being executed on this processor or when it has been
+ * verified that only a single thread remains.
+ *
+ * @see KafkaPublisher
+ * @see KafkaConsumer
+ */
+ @OnStopped
+ public void close() {
+ try {
+ if (this.kafkaPublisher != null) {
+ try {
+ this.kafkaPublisher.close();
+ } catch (Exception e) {
+ this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e);
+ }
+ }
+ } finally {
+ this.kafkaPublisher = null;
+ }
+ }
+
+ /**
+ * Will rendezvous with Kafka if {@link ProcessSession} contains
+ * {@link FlowFile} producing a result {@link FlowFile}.
+ *
+ * The result {@link FlowFile} that is successful is then transfered to
+ * {@link #REL_SUCCESS}
+ *
+ * The result {@link FlowFile} that is failed is then transfered to
+ * {@link #REL_FAILURE}
+ *
+ */
+ protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile != null) {
+ long start = System.nanoTime();
+ flowFile = this.doRendezvousWithKafka(flowFile, context, session);
+ Relationship relationship = REL_SUCCESS;
+ if (!this.isFailedFlowFile(flowFile)) {
+ String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic);
+ session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration);
+ this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis",
+ new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration});
+ } else {
+ relationship = REL_FAILURE;
+ flowFile = session.penalize(flowFile);
+ }
+ session.transfer(flowFile, relationship);
+ }
+ return flowFile != null;
+ }
+
+ /**
+ * Builds and instance of {@link KafkaPublisher}.
+ */
+ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) {
+ final Map kafkaProps = new HashMap<>();
+ KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps);
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
+ this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+ final Properties props = new Properties();
+ props.putAll(kafkaProps);
+ KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger());
+ return publisher;
+ }
+
+ /**
+ * Will rendezvous with {@link KafkaPublisher} after building
+ * {@link PublishingContext} and will produce the resulting
+ * {@link FlowFile}. The resulting FlowFile contains all required
+ * information to determine if message publishing originated from the
+ * provided FlowFile has actually succeeded fully, partially or failed
+ * completely (see {@link #isFailedFlowFile(FlowFile)}.
+ */
+ private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
+ final AtomicReference publishResultRef = new AtomicReference<>();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream contentStream) throws IOException {
+ PublishingContext publishingContext = PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream);
+ KafkaPublisher.KafkaPublisherResult result = PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext);
+ publishResultRef.set(result);
+ }
+ });
+
+ FlowFile resultFile = publishResultRef.get().isAllAcked()
+ ? this.cleanUpFlowFileIfNecessary(flowFile, session)
+ : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context));
+
+ if (!this.isFailedFlowFile(resultFile)) {
+ resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent()));
+ }
+ return resultFile;
+ }
+
+ /**
+ * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
+ * {@link PublishingContext} contains all contextual information required by
+ * {@link KafkaPublisher} to publish to Kafka. Such information contains
+ * things like topic name, content stream, delimiter, key and last ACKed
+ * message for cases where provided FlowFile is being retried (failed in the
+ * past).
+ *
+ * For the clean FlowFile (file that has been sent for the first time),
+ * PublishingContext will be built form {@link ProcessContext} associated
+ * with this invocation.
+ *
+ * For the failed FlowFile, {@link PublishingContext} will be built from
+ * attributes of that FlowFile which by then will already contain required
+ * information (e.g., topic, key, delimiter etc.). This is required to
+ * ensure the affinity of the retry in the even where processor
+ * configuration has changed. However keep in mind that failed FlowFile is
+ * only considered a failed FlowFile if it is being re-processed by the same
+ * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
+ * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to
+ * another PublishKafka0_10 processor it is treated as a fresh FlowFile
+ * regardless if it has #FAILED* attributes set.
+ */
+ private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
+ final byte[] keyBytes = getMessageKey(flowFile, context);
+
+ final String topicName;
+ final byte[] delimiterBytes;
+ int lastAckedMessageIndex = -1;
+ if (this.isFailedFlowFile(flowFile)) {
+ lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
+ topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
+ delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
+ ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
+ } else {
+ topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
+ .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
+ }
+
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex,
+ context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue());
+ publishingContext.setKeyBytes(keyBytes);
+ publishingContext.setDelimiterBytes(delimiterBytes);
+ return publishingContext;
+ }
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
- if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
- return null;
- }
-
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {
uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
@@ -394,4 +503,51 @@ public class PublishKafka_0_10 extends AbstractProcessor {
return DatatypeConverter.parseHexBinary(uninterpretedKey);
}
+
+ /**
+ * Will remove FAILED_* attributes if FlowFile is no longer considered a
+ * failed FlowFile
+ *
+ * @see #isFailedFlowFile(FlowFile)
+ */
+ private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) {
+ if (this.isFailedFlowFile(flowFile)) {
+ Set keysToRemove = new HashSet<>();
+ keysToRemove.add(FAILED_DELIMITER_ATTR);
+ keysToRemove.add(FAILED_KEY_ATTR);
+ keysToRemove.add(FAILED_TOPIC_ATTR);
+ keysToRemove.add(FAILED_PROC_ID_ATTR);
+ keysToRemove.add(FAILED_LAST_ACK_IDX);
+ flowFile = session.removeAllAttributes(flowFile, keysToRemove);
+ }
+ return flowFile;
+ }
+
+ /**
+ * Builds a {@link Map} of FAILED_* attributes
+ *
+ * @see #FAILED_PROC_ID_ATTR
+ * @see #FAILED_LAST_ACK_IDX
+ * @see #FAILED_TOPIC_ATTR
+ * @see #FAILED_KEY_ATTR
+ * @see #FAILED_DELIMITER_ATTR
+ */
+ private Map buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) {
+ Map attributes = new HashMap<>();
+ attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
+ attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex));
+ attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
+ attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
+ attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DEMARCATOR).isSet()
+ ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue() : null);
+ return attributes;
+ }
+
+ /**
+ * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
+ * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
+ */
+ private boolean isFailedFlowFile(FlowFile flowFile) {
+ return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
+ }
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
deleted file mode 100644
index b68526501f..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.nifi.flowfile.FlowFile;
-
-public interface PublishResult {
- Collection getSuccessfulFlowFiles();
-
- Collection getFailedFlowFiles();
-
- int getSuccessfulMessageCount(FlowFile flowFile);
-
- Exception getReasonForFailure(FlowFile flowFile);
-
-
- public static final PublishResult EMPTY = new PublishResult() {
- @Override
- public Collection getSuccessfulFlowFiles() {
- return Collections.emptyList();
- }
-
- @Override
- public Collection getFailedFlowFiles() {
- return Collections.emptyList();
- }
-
- @Override
- public int getSuccessfulMessageCount(FlowFile flowFile) {
- return 0;
- }
-
- @Override
- public Exception getReasonForFailure(FlowFile flowFile) {
- return null;
- }
- };
-}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
deleted file mode 100644
index b67e8a8614..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.exception.TokenTooLargeException;
-import org.apache.nifi.stream.io.util.StreamDemarcator;
-
-public class PublisherLease implements Closeable {
- private final ComponentLog logger;
- private final Producer producer;
- private final int maxMessageSize;
- private final long maxAckWaitMillis;
- private volatile boolean poisoned = false;
-
- private InFlightMessageTracker tracker;
-
- public PublisherLease(final Producer producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) {
- this.producer = producer;
- this.maxMessageSize = maxMessageSize;
- this.logger = logger;
- this.maxAckWaitMillis = maxAckWaitMillis;
- }
-
- protected void poison() {
- this.poisoned = true;
- }
-
- public boolean isPoisoned() {
- return poisoned;
- }
-
- void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException {
- if (tracker == null) {
- tracker = new InFlightMessageTracker();
- }
-
- try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
- byte[] messageContent;
- try {
- while ((messageContent = demarcator.nextToken()) != null) {
- // We do not want to use any key if we have a demarcator because that would result in
- // the key being the same for multiple messages
- final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
- publish(flowFile, keyToUse, messageContent, topic, tracker);
-
- if (tracker.isFailed(flowFile)) {
- // If we have a failure, don't try to send anything else.
- return;
- }
- }
- } catch (final TokenTooLargeException ttle) {
- tracker.fail(flowFile, ttle);
- }
- } catch (final Exception e) {
- tracker.fail(flowFile, e);
- poison();
- throw e;
- }
- }
-
- private void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
- final ProducerRecord record = new ProducerRecord<>(topic, null, messageKey, messageContent);
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception == null) {
- tracker.incrementAcknowledgedCount(flowFile);
- } else {
- tracker.fail(flowFile, exception);
- poison();
- }
- }
- });
-
- tracker.incrementSentCount(flowFile);
- }
-
- public PublishResult complete() {
- if (tracker == null) {
- throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
- }
-
- producer.flush();
-
- try {
- tracker.awaitCompletion(maxAckWaitMillis);
- return tracker.createPublishResult();
- } catch (final InterruptedException e) {
- logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
- Thread.currentThread().interrupt();
- return tracker.failOutstanding(e);
- } catch (final TimeoutException e) {
- logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
- return tracker.failOutstanding(e);
- } finally {
- tracker = null;
- }
- }
-
- @Override
- public void close() {
- producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
- tracker = null;
- }
-}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
deleted file mode 100644
index 5902b038c6..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.nifi.logging.ComponentLog;
-
-public class PublisherPool implements Closeable {
- private final ComponentLog logger;
- private final BlockingQueue publisherQueue;
- private final Map kafkaProperties;
- private final int maxMessageSize;
- private final long maxAckWaitMillis;
-
- private volatile boolean closed = false;
-
- PublisherPool(final Map kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis) {
- this.logger = logger;
- this.publisherQueue = new LinkedBlockingQueue<>();
- this.kafkaProperties = kafkaProperties;
- this.maxMessageSize = maxMessageSize;
- this.maxAckWaitMillis = maxAckWaitMillis;
- }
-
- public PublisherLease obtainPublisher() {
- if (isClosed()) {
- throw new IllegalStateException("Connection Pool is closed");
- }
-
- PublisherLease lease = publisherQueue.poll();
- if (lease != null) {
- return lease;
- }
-
- lease = createLease();
- return lease;
- }
-
- private PublisherLease createLease() {
- final Producer producer = new KafkaProducer<>(kafkaProperties);
- final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger) {
- @Override
- public void close() {
- if (isPoisoned() || isClosed()) {
- super.close();
- } else {
- publisherQueue.offer(this);
- }
- }
- };
-
- return lease;
- }
-
- public synchronized boolean isClosed() {
- return closed;
- }
-
- @Override
- public synchronized void close() {
- closed = true;
-
- PublisherLease lease;
- while ((lease = publisherQueue.poll()) != null) {
- lease.close();
- }
- }
-
- /**
- * Returns the number of leases that are currently available
- *
- * @return the number of leases currently available
- */
- protected int available() {
- return publisherQueue.size();
- }
-}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
new file mode 100644
index 0000000000..1513481df1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Holder of context information used by {@link KafkaPublisher} required to
+ * publish messages to Kafka.
+ */
+class PublishingContext {
+
+ private final InputStream contentStream;
+
+ private final String topic;
+
+ private final int lastAckedMessageIndex;
+
+ private final int maxRequestSize;
+
+ private byte[] keyBytes;
+
+ private byte[] delimiterBytes;
+
+ PublishingContext(InputStream contentStream, String topic) {
+ this(contentStream, topic, -1);
+ }
+
+ PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) {
+ this(contentStream, topic, lastAckedMessageIndex, 1048576);
+ }
+
+ PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) {
+ this.validateInput(contentStream, topic, lastAckedMessageIndex);
+ this.contentStream = contentStream;
+ this.topic = topic;
+ this.lastAckedMessageIndex = lastAckedMessageIndex;
+ this.maxRequestSize = maxRequestSize;
+ }
+
+ @Override
+ public String toString() {
+ return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
+ }
+
+ int getLastAckedMessageIndex() {
+ return this.lastAckedMessageIndex;
+ }
+
+ int getMaxRequestSize() {
+ return this.maxRequestSize;
+ }
+
+ byte[] getKeyBytes() {
+ return this.keyBytes;
+ }
+
+ byte[] getDelimiterBytes() {
+ return this.delimiterBytes;
+ }
+
+ InputStream getContentStream() {
+ return this.contentStream;
+ }
+
+ String getTopic() {
+ return this.topic;
+ }
+
+ void setKeyBytes(byte[] keyBytes) {
+ if (this.keyBytes == null) {
+ if (keyBytes != null) {
+ this.assertBytesValid(keyBytes);
+ this.keyBytes = keyBytes;
+ }
+ } else {
+ throw new IllegalArgumentException("'keyBytes' can only be set once per instance");
+ }
+ }
+
+ void setDelimiterBytes(byte[] delimiterBytes) {
+ if (this.delimiterBytes == null) {
+ if (delimiterBytes != null) {
+ this.assertBytesValid(delimiterBytes);
+ this.delimiterBytes = delimiterBytes;
+ }
+ } else {
+ throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance");
+ }
+ }
+
+ private void assertBytesValid(byte[] bytes) {
+ if (bytes != null) {
+ if (bytes.length == 0) {
+ throw new IllegalArgumentException("'bytes' must not be empty");
+ }
+ }
+ }
+
+ private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) {
+ if (contentStream == null) {
+ throw new IllegalArgumentException("'contentStream' must not be null");
+ } else if (topic == null || topic.trim().length() == 0) {
+ throw new IllegalArgumentException("'topic' must not be null or empty");
+ } else if (lastAckedMessageIndex < -1) {
+ throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
new file mode 100644
index 0000000000..19c64af1af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+public class KafkaPublisherTest {
+
+ private static EmbeddedKafka kafkaLocal;
+
+ private static EmbeddedKafkaProducerHelper producerHelper;
+
+ @BeforeClass
+ public static void beforeClass() {
+ kafkaLocal = new EmbeddedKafka();
+ kafkaLocal.start();
+ producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ producerHelper.close();
+ kafkaLocal.stop();
+ }
+
+ @Test
+ public void validateSuccessfulSendAsWhole() throws Exception {
+ InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8));
+ String topicName = "validateSuccessfulSendAsWhole";
+
+ Properties kafkaProperties = this.buildProducerProperties();
+ KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
+
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
+ KafkaPublisherResult result = publisher.publish(publishingContext);
+
+ assertEquals(0, result.getLastMessageAcked());
+ assertEquals(1, result.getMessagesSent());
+ contentStream.close();
+ publisher.close();
+
+ ConsumerIterator iter = this.buildConsumer(topicName);
+ assertNotNull(iter.next());
+ try {
+ iter.next();
+ } catch (ConsumerTimeoutException e) {
+ // that's OK since this is the Kafka mechanism to unblock
+ }
+ }
+
+ @Test
+ public void validateSuccessfulSendAsDelimited() throws Exception {
+ InputStream contentStream = new ByteArrayInputStream(
+ "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8));
+ String topicName = "validateSuccessfulSendAsDelimited";
+
+ Properties kafkaProperties = this.buildProducerProperties();
+ KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
+
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
+ publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+ KafkaPublisherResult result = publisher.publish(publishingContext);
+
+ assertEquals(3, result.getLastMessageAcked());
+ assertEquals(4, result.getMessagesSent());
+ contentStream.close();
+ publisher.close();
+
+ ConsumerIterator iter = this.buildConsumer(topicName);
+ assertNotNull(iter.next());
+ assertNotNull(iter.next());
+ assertNotNull(iter.next());
+ assertNotNull(iter.next());
+ try {
+ iter.next();
+ fail();
+ } catch (ConsumerTimeoutException e) {
+ // that's OK since this is the Kafka mechanism to unblock
+ }
+ }
+
+ /*
+ * This test simulates the condition where not all messages were ACKed by
+ * Kafka
+ */
+ @Test
+ public void validateRetries() throws Exception {
+ byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
+ InputStream contentStream = new ByteArrayInputStream(testValue);
+ String topicName = "validateSuccessfulReSendOfFailedSegments";
+
+ Properties kafkaProperties = this.buildProducerProperties();
+
+ KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
+
+ // simulates the first re-try
+ int lastAckedMessageIndex = 1;
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+ publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+
+ publisher.publish(publishingContext);
+
+ ConsumerIterator iter = this.buildConsumer(topicName);
+ String m1 = new String(iter.next().message());
+ String m2 = new String(iter.next().message());
+ assertEquals("Hello Kafka3", m1);
+ assertEquals("Hello Kafka4", m2);
+ try {
+ iter.next();
+ fail();
+ } catch (ConsumerTimeoutException e) {
+ // that's OK since this is the Kafka mechanism to unblock
+ }
+
+ // simulates the second re-try
+ lastAckedMessageIndex = 2;
+ contentStream = new ByteArrayInputStream(testValue);
+ publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+ publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+ publisher.publish(publishingContext);
+
+ m1 = new String(iter.next().message());
+ assertEquals("Hello Kafka4", m1);
+
+ publisher.close();
+ }
+
+ /*
+ * Similar to the above test, but it sets the first retry index to the last
+ * possible message index and second index to an out of bound index. The
+ * expectation is that no messages will be sent to Kafka
+ */
+ @Test
+ public void validateRetriesWithWrongIndex() throws Exception {
+ byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
+ InputStream contentStream = new ByteArrayInputStream(testValue);
+ String topicName = "validateRetriesWithWrongIndex";
+
+ Properties kafkaProperties = this.buildProducerProperties();
+
+ KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
+
+ // simulates the first re-try
+ int lastAckedMessageIndex = 3;
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+ publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+
+ publisher.publish(publishingContext);
+
+ ConsumerIterator iter = this.buildConsumer(topicName);
+ try {
+ iter.next();
+ fail();
+ } catch (ConsumerTimeoutException e) {
+ // that's OK since this is the Kafka mechanism to unblock
+ }
+
+ // simulates the second re-try
+ lastAckedMessageIndex = 6;
+ contentStream = new ByteArrayInputStream(testValue);
+ publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+ publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+ publisher.publish(publishingContext);
+ try {
+ iter.next();
+ fail();
+ } catch (ConsumerTimeoutException e) {
+ // that's OK since this is the Kafka mechanism to unblock
+ }
+
+ publisher.close();
+ }
+
+ @Test
+ public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
+ String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
+ InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+ String topicName = "validateWithMultiByteCharacters";
+
+ Properties kafkaProperties = this.buildProducerProperties();
+
+ KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
+
+ publisher.publish(publishingContext);
+ publisher.close();
+
+ ConsumerIterator iter = this.buildConsumer(topicName);
+ String r = new String(iter.next().message(), StandardCharsets.UTF_8);
+ assertEquals(data, r);
+ }
+
+ @Test
+ public void validateWithNonDefaultPartitioner() throws Exception {
+ String data = "fooandbarandbaz";
+ InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+ String topicName = "validateWithNonDefaultPartitioner";
+
+ Properties kafkaProperties = this.buildProducerProperties();
+ kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName());
+ KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
+ PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
+ publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
+
+ try {
+ publisher.publish(publishingContext);
+ // partitioner should be invoked 3 times
+ assertTrue(TestPartitioner.counter == 3);
+ publisher.close();
+ } finally {
+ TestPartitioner.counter = 0;
+ }
+ }
+
+ private Properties buildProducerProperties() {
+ Properties kafkaProperties = new Properties();
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaLocal.getKafkaPort());
+ kafkaProperties.put("auto.create.topics.enable", "true");
+ return kafkaProperties;
+ }
+
+ private ConsumerIterator buildConsumer(String topic) {
+ Properties props = new Properties();
+ props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort());
+ props.put("group.id", "test");
+ props.put("consumer.timeout.ms", "500");
+ props.put("auto.offset.reset", "smallest");
+ ConsumerConfig consumerConfig = new ConsumerConfig(props);
+ ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+ Map topicCountMap = new HashMap<>(1);
+ topicCountMap.put(topic, 1);
+ Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ List> streams = consumerMap.get(topic);
+ ConsumerIterator iter = streams.get(0).iterator();
+ return iter;
+ }
+
+ public static class TestPartitioner implements Partitioner {
+
+ static int counter;
+
+ @Override
+ public void configure(Map configs) {
+ // nothing to do, test
+ }
+
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
+ Cluster cluster) {
+ counter++;
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ counter = 0;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
new file mode 100644
index 0000000000..b3f1bd1e53
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.times;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+
+public class PublishKafkaTest {
+
+ @Test
+ public void validateCustomSerilaizerDeserializerSettings() throws Exception {
+ PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+ TestRunner runner = TestRunners.newTestRunner(publishKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3 sec");
+ runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ runner.assertValid();
+ runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void validatePropertiesValidation() throws Exception {
+ PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+ TestRunner runner = TestRunners.newTestRunner(publishKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "foo");
+
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid"));
+ }
+ }
+
+ @Test
+ public void validateCustomValidation() {
+ String topicName = "validateCustomValidation";
+ PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+
+ /*
+ * Validates that Kerberos principle is required if one of SASL set for
+ * secirity protocol
+ */
+ TestRunner runner = TestRunners.newTestRunner(publishKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ try {
+ runner.run();
+ fail();
+ } catch (Throwable e) {
+ assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because"));
+ }
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateSingleCharacterDemarcatedMessages() {
+ String topicName = "validateSingleCharacterDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+
+ runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
+ String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
+
+ runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
+ String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+ StubPublishKafka putKafka = new StubPublishKafka(1);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
+
+ runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3000 millis");
+
+ final String text = "Hello World\nGoodbye\nfail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ putKafka.destroy();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(1);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+ final String text = "Hello World\nGoodbye\nfail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+ final String text = "futurefail\nHello World\nGoodbye\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
+ assertNotNull(ff);
+ runner.enqueue(ff);
+
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ // 6 sends due to duplication
+ verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+ final String text = "Hello World\nGoodbye\nfuturefail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
+ assertNotNull(ff);
+ runner.enqueue(ff);
+
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer producer = putKafka.getProducer();
+ // 6 sends due to duplication
+ verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateDemarcationIntoEmptyMessages() {
+ String topicName = "validateDemarcationIntoEmptyMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ final TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+
+ final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
+ runner.enqueue(bytes);
+ runner.run(1);
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateComplexRightPartialDemarcatedMessages() {
+ String topicName = "validateComplexRightPartialDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "僠<僠WILDSTUFF僠>僠");
+
+ runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateComplexLeftPartialDemarcatedMessages() {
+ String topicName = "validateComplexLeftPartialDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "僠<僠WILDSTUFF僠>僠");
+
+ runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateComplexPartialMatchDemarcatedMessages() {
+ String topicName = "validateComplexPartialMatchDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "僠<僠WILDSTUFF僠>僠");
+
+ runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ Producer producer = putKafka.getProducer();
+ verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @Test
+ public void validateUtf8Key() {
+ String topicName = "validateUtf8Key";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+ final Map attributes = Collections.singletonMap("myKey", "key1");
+ runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ final Map