NIFI-7522 bumping from Apache Kafka 2.5 to Apache Kafka 2.6

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4462.
This commit is contained in:
Joe Witt 2020-08-07 09:56:25 -07:00 committed by Pierre Villard
parent 0f18c5966e
commit 58e324e9f1
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
40 changed files with 165 additions and 165 deletions

View File

@ -213,7 +213,7 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-2-5-nar</artifactId>
<artifactId>nifi-kafka-2-6-nar</artifactId>
<version>1.12.0-SNAPSHOT</version>
<type>nar</type>
</dependency>

View File

@ -19,7 +19,7 @@
<artifactId>nifi-kafka-bundle</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-kafka-2-5-nar</artifactId>
<artifactId>nifi-kafka-2-6-nar</artifactId>
<packaging>nar</packaging>
<description>NiFi NAR for interacting with Apache Kafka 2.5</description>
<properties>
@ -29,7 +29,7 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-2-5-processors</artifactId>
<artifactId>nifi-kafka-2-6-processors</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -19,7 +19,7 @@
<version>1.12.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-kafka-2-5-processors</artifactId>
<artifactId>nifi-kafka-2-6-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
@ -61,12 +61,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka2.5.version}</version>
<version>${kafka2.6.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka2.5.version}</version>
<artifactId>kafka_2.13</artifactId>
<version>${kafka2.6.version}</version>
<scope>test</scope>
<exclusions>
<!-- Transitive dependencies excluded because they are located

View File

@ -57,14 +57,14 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.5 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_5. Please note that, at this time, the Processor assumes that "
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_6. Please note that, at this time, the Processor assumes that "
+ "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
+ "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+ "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they "
+ "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.")
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.5"})
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.6"})
@WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@ -78,8 +78,8 @@ import java.util.regex.Pattern;
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso({ConsumeKafka_2_5.class, PublishKafka_2_5.class, PublishKafkaRecord_2_5.class})
public class ConsumeKafkaRecord_2_5 extends AbstractProcessor {
@SeeAlso({ConsumeKafka_2_6.class, PublishKafka_2_6.class, PublishKafkaRecord_2_6.class})
public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
@ -310,8 +310,8 @@ public class ConsumeKafkaRecord_2_5 extends AbstractProcessor {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafkaRecord_2_5.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafkaRecord_2_5.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final String topicListing = context.getProperty(ConsumeKafkaRecord_2_6.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafkaRecord_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();

View File

@ -58,9 +58,9 @@ import java.util.regex.Pattern;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.5 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafka_2_5.")
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.5"})
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafka_2_6.")
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.6"})
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
@ -76,7 +76,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
public class ConsumeKafka_2_5 extends AbstractProcessor {
public class ConsumeKafka_2_6 extends AbstractProcessor {
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
@ -290,8 +290,8 @@ public class ConsumeKafka_2_5 extends AbstractProcessor {
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final int maxLeases = context.getMaxConcurrentTasks();
final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final byte[] demarcator = context.getProperty(ConsumeKafka_2_5.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_2_5.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
final byte[] demarcator = context.getProperty(ConsumeKafka_2_6.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_2_6.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
final Map<String, Object> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
@ -299,8 +299,8 @@ public class ConsumeKafka_2_5 extends AbstractProcessor {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafka_2_5.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafka_2_5.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final String topicListing = context.getProperty(ConsumeKafka_2_6.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafka_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();

View File

@ -58,8 +58,8 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_5.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_5.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

View File

@ -83,7 +83,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.5"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.5 Producer API. "
+ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
+ "The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_5.")
+ "The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_6.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
@ -92,8 +92,8 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
expressionLanguageScope = VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success.")
@SeeAlso({PublishKafka_2_5.class, ConsumeKafka_2_5.class, ConsumeKafkaRecord_2_5.class})
public class PublishKafkaRecord_2_5 extends AbstractProcessor {
@SeeAlso({PublishKafka_2_6.class, ConsumeKafka_2_6.class, ConsumeKafkaRecord_2_6.class})
public class PublishKafkaRecord_2_6 extends AbstractProcessor {
protected static final String MSG_COUNT = "msg.count";
static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",

View File

@ -69,7 +69,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.5 Producer API."
+ "The messages to send may be individual FlowFiles or may be delimited, using a "
+ "user-specified delimiter, such as a new-line. "
+ "The complementary NiFi processor for fetching messages is ConsumeKafka_2_5.")
+ "The complementary NiFi processor for fetching messages is ConsumeKafka_2_6.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
@ -79,7 +79,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+ "be greater than 1.")
public class PublishKafka_2_5 extends AbstractProcessor {
public class PublishKafka_2_6 extends AbstractProcessor {
protected static final String MSG_COUNT = "msg.count";
static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",

View File

@ -69,8 +69,8 @@ import java.util.concurrent.TimeoutException;
@Tags({"kafka", "record", "sink"})
@CapabilityDescription("Provides a service to write records to a Kafka 2.x topic.")
public class KafkaRecordSink_2_5 extends AbstractControllerService implements RecordSinkService {
@CapabilityDescription("Provides a service to write records to a Kafka 2.6+ topic.")
public class KafkaRecordSink_2_6 extends AbstractControllerService implements RecordSinkService {
static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
"Records are considered 'transmitted unsuccessfully' unless the message is replicated to the appropriate "

View File

@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.record.sink.kafka.KafkaRecordSink_2_5
org.apache.nifi.record.sink.kafka.KafkaRecordSink_2_6

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_5
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_5
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_5
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_5
org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6

View File

@ -24,7 +24,7 @@
<h2>Description</h2>
<p>
This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
for data using KafkaConsumer API available with Kafka 2.5. When a message is received
for data using KafkaConsumer API available with Kafka 2.6. When a message is received
from Kafka, the message will be deserialized using the configured Record Reader, and then
written to a FlowFile by serializing the message with the configured Record Writer.
</p>

View File

@ -24,7 +24,7 @@
<h2>Description</h2>
<p>
This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
for data using KafkaConsumer API available with Kafka 2.5. When a message is received
for data using KafkaConsumer API available with Kafka 2.6. When a message is received
from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value
of the Kafka message.
</p>

View File

@ -25,7 +25,7 @@
<p>
This Processor puts the contents of a FlowFile to a Topic in
<a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
with Kafka 2.5 API. The contents of the incoming FlowFile will be read using the
with Kafka 2.6 API. The contents of the incoming FlowFile will be read using the
configured Record Reader. Each record will then be serialized using the configured
Record Writer, and this serialized form will be the content of a Kafka message.
This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.

View File

@ -25,7 +25,7 @@
<p>
This Processor puts the contents of a FlowFile to a Topic in
<a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
with Kafka 2.5 API. The content of a FlowFile becomes the contents of a Kafka message.
with Kafka 2.6 API. The content of a FlowFile becomes the contents of a Kafka message.
This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
</p>

View File

@ -30,7 +30,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class ITConsumeKafka_2_5 {
public class ITConsumeKafka_2_6 {
ConsumerLease mockLease = null;
ConsumerPool mockConsumerPool = null;
@ -49,7 +49,7 @@ public class ITConsumeKafka_2_5 {
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_2_5 proc = new ConsumeKafka_2_5() {
ConsumeKafka_2_6 proc = new ConsumeKafka_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
@ -57,9 +57,9 @@ public class ITConsumeKafka_2_5 {
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_2_5.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_2_5.AUTO_OFFSET_RESET, ConsumeKafka_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_2_6.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
@ -79,7 +79,7 @@ public class ITConsumeKafka_2_5 {
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_2_5 proc = new ConsumeKafka_2_5() {
ConsumeKafka_2_6 proc = new ConsumeKafka_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
@ -87,10 +87,10 @@ public class ITConsumeKafka_2_5 {
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_2_5.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_2_5.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_2_5.AUTO_OFFSET_RESET, ConsumeKafka_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_2_6.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_2_6.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
@ -110,7 +110,7 @@ public class ITConsumeKafka_2_5 {
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka_2_5 proc = new ConsumeKafka_2_5() {
ConsumeKafka_2_6 proc = new ConsumeKafka_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
@ -118,9 +118,9 @@ public class ITConsumeKafka_2_5 {
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_2_5.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_2_5.AUTO_OFFSET_RESET, ConsumeKafka_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_2_6.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());

View File

@ -39,7 +39,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TestConsumeKafkaRecord_2_5 {
public class TestConsumeKafkaRecord_2_6 {
private ConsumerLease mockLease = null;
private ConsumerPool mockConsumerPool = null;
@ -50,7 +50,7 @@ public class TestConsumeKafkaRecord_2_5 {
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
ConsumeKafkaRecord_2_5 proc = new ConsumeKafkaRecord_2_5() {
ConsumeKafkaRecord_2_6 proc = new ConsumeKafkaRecord_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
@ -72,15 +72,15 @@ public class TestConsumeKafkaRecord_2_5 {
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConsumeKafkaRecord_2_5.RECORD_READER, readerId);
runner.setProperty(ConsumeKafkaRecord_2_5.RECORD_WRITER, writerId);
runner.setProperty(ConsumeKafkaRecord_2_6.RECORD_READER, readerId);
runner.setProperty(ConsumeKafkaRecord_2_6.RECORD_WRITER, writerId);
}
@Test
public void validateCustomValidatorSettings() throws Exception {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
runner.assertValid();
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
@ -95,11 +95,11 @@ public class TestConsumeKafkaRecord_2_5 {
@Test
public void validatePropertiesValidation() throws Exception {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.removeProperty(ConsumeKafkaRecord_2_5.GROUP_ID);
runner.removeProperty(ConsumeKafkaRecord_2_6.GROUP_ID);
try {
runner.assertValid();
fail();
@ -107,7 +107,7 @@ public class TestConsumeKafkaRecord_2_5 {
assertTrue(e.getMessage().contains("invalid because Group ID is required"));
}
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "");
try {
runner.assertValid();
fail();
@ -115,7 +115,7 @@ public class TestConsumeKafkaRecord_2_5 {
assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
}
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, " ");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, " ");
try {
runner.assertValid();
fail();
@ -132,9 +132,9 @@ public class TestConsumeKafkaRecord_2_5 {
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
@ -154,10 +154,10 @@ public class TestConsumeKafkaRecord_2_5 {
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafkaRecord_2_5.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafkaRecord_2_6.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
@ -177,9 +177,9 @@ public class TestConsumeKafkaRecord_2_5 {
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
@ -193,9 +193,9 @@ public class TestConsumeKafkaRecord_2_5 {
@Test
public void testJaasConfigurationWithDefaultMechanism() {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
runner.assertNotValid();
@ -215,9 +215,9 @@ public class TestConsumeKafkaRecord_2_5 {
@Test
public void testJaasConfigurationWithPlainMechanism() {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
runner.assertNotValid();
@ -237,9 +237,9 @@ public class TestConsumeKafkaRecord_2_5 {
@Test
public void testJaasConfigurationWithScram256Mechanism() {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
runner.assertNotValid();
@ -259,9 +259,9 @@ public class TestConsumeKafkaRecord_2_5 {
@Test
public void testJaasConfigurationWithScram512Mechanism() {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
runner.assertNotValid();
@ -281,9 +281,9 @@ public class TestConsumeKafkaRecord_2_5 {
@Test
public void testNonSaslSecurityProtocol() {
runner.setProperty(ConsumeKafkaRecord_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_5.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_PLAINTEXT);
runner.assertValid();

View File

@ -27,7 +27,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class TestConsumeKafka_2_5 {
public class TestConsumeKafka_2_6 {
ConsumerLease mockLease = null;
ConsumerPool mockConsumerPool = null;
@ -40,12 +40,12 @@ public class TestConsumeKafka_2_5 {
@Test
public void validateCustomValidatorSettings() throws Exception {
ConsumeKafka_2_5 consumeKafka = new ConsumeKafka_2_5();
ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();
TestRunner runner = TestRunners.newTestRunner(consumeKafka);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
runner.setProperty(ConsumeKafka_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_5.AUTO_OFFSET_RESET, ConsumeKafka_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
runner.assertValid();
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
@ -60,14 +60,14 @@ public class TestConsumeKafka_2_5 {
@Test
public void validatePropertiesValidation() throws Exception {
ConsumeKafka_2_5 consumeKafka = new ConsumeKafka_2_5();
ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();
TestRunner runner = TestRunners.newTestRunner(consumeKafka);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
runner.setProperty(ConsumeKafka_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_5.AUTO_OFFSET_RESET, ConsumeKafka_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.removeProperty(ConsumeKafka_2_5.GROUP_ID);
runner.removeProperty(ConsumeKafka_2_6.GROUP_ID);
try {
runner.assertValid();
fail();
@ -75,7 +75,7 @@ public class TestConsumeKafka_2_5 {
assertTrue(e.getMessage().contains("invalid because Group ID is required"));
}
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, "");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "");
try {
runner.assertValid();
fail();
@ -83,7 +83,7 @@ public class TestConsumeKafka_2_5 {
assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
}
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, " ");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, " ");
try {
runner.assertValid();
fail();
@ -94,12 +94,12 @@ public class TestConsumeKafka_2_5 {
@Test
public void testJaasConfiguration() throws Exception {
ConsumeKafka_2_5 consumeKafka = new ConsumeKafka_2_5();
ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();
TestRunner runner = TestRunners.newTestRunner(consumeKafka);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
runner.setProperty(ConsumeKafka_2_5.TOPICS, "foo");
runner.setProperty(ConsumeKafka_2_5.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_5.AUTO_OFFSET_RESET, ConsumeKafka_2_5.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
runner.assertNotValid();

View File

@ -61,7 +61,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestPublishKafkaRecord_2_5 {
public class TestPublishKafkaRecord_2_6 {
private static final String TOPIC_NAME = "unit-test";
@ -78,14 +78,14 @@ public class TestPublishKafkaRecord_2_5 {
when(mockPool.obtainPublisher()).thenReturn(mockLease);
runner = TestRunners.newTestRunner(new PublishKafkaRecord_2_5() {
runner = TestRunners.newTestRunner(new PublishKafkaRecord_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return mockPool;
}
});
runner.setProperty(PublishKafkaRecord_2_5.TOPIC, TOPIC_NAME);
runner.setProperty(PublishKafkaRecord_2_6.TOPIC, TOPIC_NAME);
final String readerId = "record-reader";
final MockRecordParser readerService = new MockRecordParser();
@ -99,9 +99,9 @@ public class TestPublishKafkaRecord_2_5 {
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(PublishKafkaRecord_2_5.RECORD_READER, readerId);
runner.setProperty(PublishKafkaRecord_2_5.RECORD_WRITER, writerId);
runner.setProperty(PublishKafka_2_5.DELIVERY_GUARANTEE, PublishKafka_2_5.DELIVERY_REPLICATED);
runner.setProperty(PublishKafkaRecord_2_6.RECORD_READER, readerId);
runner.setProperty(PublishKafkaRecord_2_6.RECORD_WRITER, writerId);
runner.setProperty(PublishKafka_2_6.DELIVERY_GUARANTEE, PublishKafka_2_6.DELIVERY_REPLICATED);
}
@Test
@ -111,7 +111,7 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
@ -130,7 +130,7 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_SUCCESS, 3);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
@ -146,7 +146,7 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_FAILURE, 1);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
@ -166,7 +166,7 @@ public class TestPublishKafkaRecord_2_5 {
}).when(mockLease).beginTransaction();
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_FAILURE, 1);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 1);
verify(mockLease, times(1)).poison();
verify(mockLease, times(1)).close();
@ -182,7 +182,7 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_FAILURE, 3);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
@ -205,7 +205,7 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_SUCCESS, 2);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
@ -216,10 +216,10 @@ public class TestPublishKafkaRecord_2_5 {
verify(mockLease, times(1)).close();
runner.assertAllFlowFilesContainAttribute("msg.count");
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_5.REL_SUCCESS).stream()
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS).stream()
.filter(ff -> ff.getAttribute("msg.count").equals("10"))
.count());
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_5.REL_SUCCESS).stream()
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS).stream()
.filter(ff -> ff.getAttribute("msg.count").equals("20"))
.count());
}
@ -237,7 +237,7 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
@ -245,14 +245,14 @@ public class TestPublishKafkaRecord_2_5 {
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_2_5.REL_SUCCESS).get(0);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS).get(0);
mff.assertAttributeEquals("msg.count", "0");
}
@Test
public void testRecordPathPartition() throws IOException {
runner.setProperty(PublishKafkaRecord_2_5.PARTITION_CLASS, PublishKafkaRecord_2_5.RECORD_PATH_PARTITIONING);
runner.setProperty(PublishKafkaRecord_2_5.PARTITION, "/age");
runner.setProperty(PublishKafkaRecord_2_6.PARTITION_CLASS, PublishKafkaRecord_2_6.RECORD_PATH_PARTITIONING);
runner.setProperty(PublishKafkaRecord_2_6.PARTITION, "/age");
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 48\nJim Doe, 13"));
@ -289,7 +289,7 @@ public class TestPublishKafkaRecord_2_5 {
nullable(RecordSchema.class), nullable(String.class), any(String.class), nullable(Function.class));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_5.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
nullable(RecordSchema.class), nullable(String.class), any(String.class), nullable(Function.class));
@ -326,15 +326,15 @@ public class TestPublishKafkaRecord_2_5 {
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertTransferCount(PublishKafkaRecord_2_5.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafkaRecord_2_5.REL_FAILURE, 4);
runner.assertTransferCount(PublishKafkaRecord_2_6.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafkaRecord_2_6.REL_FAILURE, 4);
verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_2_5.REL_FAILURE).stream()
assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_FAILURE).stream()
.noneMatch(ff -> ff.getAttribute("msg.count") != null));
}

View File

@ -50,7 +50,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestPublishKafka_2_5 {
public class TestPublishKafka_2_6 {
private static final String TOPIC_NAME = "unit-test";
private PublisherPool mockPool;
@ -64,15 +64,15 @@ public class TestPublishKafka_2_5 {
when(mockPool.obtainPublisher()).thenReturn(mockLease);
runner = TestRunners.newTestRunner(new PublishKafka_2_5() {
runner = TestRunners.newTestRunner(new PublishKafka_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return mockPool;
}
});
runner.setProperty(PublishKafka_2_5.TOPIC, TOPIC_NAME);
runner.setProperty(PublishKafka_2_5.DELIVERY_GUARANTEE, PublishKafka_2_5.DELIVERY_REPLICATED);
runner.setProperty(PublishKafka_2_6.TOPIC, TOPIC_NAME);
runner.setProperty(PublishKafka_2_6.DELIVERY_GUARANTEE, PublishKafka_2_6.DELIVERY_REPLICATED);
}
@Test
@ -82,7 +82,7 @@ public class TestPublishKafka_2_5 {
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_5.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
verify(mockLease, times(1)).complete();
@ -100,7 +100,7 @@ public class TestPublishKafka_2_5 {
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_5.REL_SUCCESS, 3);
runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
verify(mockLease, times(1)).complete();
@ -121,7 +121,7 @@ public class TestPublishKafka_2_5 {
}).when(mockLease).beginTransaction();
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_5.REL_FAILURE, 2);
runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 2);
verify(mockLease, times(1)).poison();
verify(mockLease, times(1)).close();
@ -134,7 +134,7 @@ public class TestPublishKafka_2_5 {
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_5.REL_FAILURE, 1);
runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
verify(mockLease, times(1)).complete();
@ -151,7 +151,7 @@ public class TestPublishKafka_2_5 {
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_5.REL_FAILURE, 3);
runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
verify(mockLease, times(1)).complete();
@ -173,7 +173,7 @@ public class TestPublishKafka_2_5 {
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_5.REL_SUCCESS, 2);
runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
verify(mockLease, times(1)).complete();
@ -181,10 +181,10 @@ public class TestPublishKafka_2_5 {
verify(mockLease, times(1)).close();
runner.assertAllFlowFilesContainAttribute("msg.count");
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_5.REL_SUCCESS).stream()
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_6.REL_SUCCESS).stream()
.filter(ff -> ff.getAttribute("msg.count").equals("10"))
.count());
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_5.REL_SUCCESS).stream()
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_6.REL_SUCCESS).stream()
.filter(ff -> ff.getAttribute("msg.count").equals("20"))
.count());
}
@ -211,14 +211,14 @@ public class TestPublishKafka_2_5 {
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertTransferCount(PublishKafka_2_5.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafka_2_5.REL_FAILURE, 4);
runner.assertTransferCount(PublishKafka_2_6.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafka_2_6.REL_FAILURE, 4);
verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
assertTrue(runner.getFlowFilesForRelationship(PublishKafka_2_5.REL_FAILURE).stream()
assertTrue(runner.getFlowFilesForRelationship(PublishKafka_2_6.REL_FAILURE).stream()
.noneMatch(ff -> ff.getAttribute("msg.count") != null));
}

View File

@ -67,13 +67,13 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestKafkaRecordSink_2_5 {
public class TestKafkaRecordSink_2_6 {
private static final String TOPIC_NAME = "unit-test";
@Test
public void testRecordFormat() throws IOException, InitializationException {
MockKafkaRecordSink_2_5 task = initTask();
MockKafkaRecordSink_2_6 task = initTask();
List<RecordField> recordFields = Arrays.asList(
new RecordField("field1", RecordFieldType.INT.getDataType()),
@ -108,34 +108,34 @@ public class TestKafkaRecordSink_2_5 {
assertEquals("World!", data[1]);
}
private MockKafkaRecordSink_2_5 initTask() throws InitializationException {
private MockKafkaRecordSink_2_6 initTask() throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
final MockKafkaRecordSink_2_5 task = new MockKafkaRecordSink_2_5();
final MockKafkaRecordSink_2_6 task = new MockKafkaRecordSink_2_6();
ConfigurationContext context = mock(ConfigurationContext.class);
final StateManager stateManager = new MockStateManager(task);
final PropertyValue topicValue = Mockito.mock(StandardPropertyValue.class);
when(topicValue.evaluateAttributeExpressions()).thenReturn(topicValue);
when(topicValue.getValue()).thenReturn(TOPIC_NAME);
when(context.getProperty(KafkaRecordSink_2_5.TOPIC)).thenReturn(topicValue);
when(context.getProperty(KafkaRecordSink_2_6.TOPIC)).thenReturn(topicValue);
final PropertyValue deliveryValue = Mockito.mock(StandardPropertyValue.class);
when(deliveryValue.getValue()).thenReturn(KafkaRecordSink_2_5.DELIVERY_REPLICATED.getValue());
when(context.getProperty(KafkaRecordSink_2_5.DELIVERY_GUARANTEE)).thenReturn(deliveryValue);
when(deliveryValue.getValue()).thenReturn(KafkaRecordSink_2_6.DELIVERY_REPLICATED.getValue());
when(context.getProperty(KafkaRecordSink_2_6.DELIVERY_GUARANTEE)).thenReturn(deliveryValue);
final PropertyValue maxSizeValue = Mockito.mock(StandardPropertyValue.class);
when(maxSizeValue.asDataSize(DataUnit.B)).thenReturn(1024.0);
when(context.getProperty(KafkaRecordSink_2_5.MAX_REQUEST_SIZE)).thenReturn(maxSizeValue);
when(context.getProperty(KafkaRecordSink_2_6.MAX_REQUEST_SIZE)).thenReturn(maxSizeValue);
final PropertyValue maxAckWaitValue = Mockito.mock(StandardPropertyValue.class);
when(maxAckWaitValue.asTimePeriod(TimeUnit.MILLISECONDS)).thenReturn(5000L);
when(context.getProperty(KafkaRecordSink_2_5.ACK_WAIT_TIME)).thenReturn(maxAckWaitValue);
when(context.getProperty(KafkaRecordSink_2_6.ACK_WAIT_TIME)).thenReturn(maxAckWaitValue);
final PropertyValue charEncodingValue = Mockito.mock(StandardPropertyValue.class);
when(charEncodingValue.evaluateAttributeExpressions()).thenReturn(charEncodingValue);
when(charEncodingValue.getValue()).thenReturn("UTF-8");
when(context.getProperty(KafkaRecordSink_2_5.MESSAGE_HEADER_ENCODING)).thenReturn(charEncodingValue);
when(context.getProperty(KafkaRecordSink_2_6.MESSAGE_HEADER_ENCODING)).thenReturn(charEncodingValue);
final PropertyValue securityValue = Mockito.mock(StandardPropertyValue.class);
when(securityValue.getValue()).thenReturn(KafkaProcessorUtils.SEC_SASL_PLAINTEXT.getValue());
@ -147,11 +147,11 @@ public class TestKafkaRecordSink_2_5 {
when(context.getProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME)).thenReturn(jaasValue);
Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
propertyMap.put(KafkaRecordSink_2_5.TOPIC, KafkaRecordSink_2_5.TOPIC.getName());
propertyMap.put(KafkaRecordSink_2_5.DELIVERY_GUARANTEE, KafkaRecordSink_2_5.DELIVERY_GUARANTEE.getName());
propertyMap.put(KafkaRecordSink_2_5.MAX_REQUEST_SIZE, KafkaRecordSink_2_5.MAX_REQUEST_SIZE.getName());
propertyMap.put(KafkaRecordSink_2_5.ACK_WAIT_TIME, KafkaRecordSink_2_5.ACK_WAIT_TIME.getName());
propertyMap.put(KafkaRecordSink_2_5.MESSAGE_HEADER_ENCODING, KafkaRecordSink_2_5.MESSAGE_HEADER_ENCODING.getName());
propertyMap.put(KafkaRecordSink_2_6.TOPIC, KafkaRecordSink_2_6.TOPIC.getName());
propertyMap.put(KafkaRecordSink_2_6.DELIVERY_GUARANTEE, KafkaRecordSink_2_6.DELIVERY_GUARANTEE.getName());
propertyMap.put(KafkaRecordSink_2_6.MAX_REQUEST_SIZE, KafkaRecordSink_2_6.MAX_REQUEST_SIZE.getName());
propertyMap.put(KafkaRecordSink_2_6.ACK_WAIT_TIME, KafkaRecordSink_2_6.ACK_WAIT_TIME.getName());
propertyMap.put(KafkaRecordSink_2_6.MESSAGE_HEADER_ENCODING, KafkaRecordSink_2_6.MESSAGE_HEADER_ENCODING.getName());
when(context.getProperties()).thenReturn(propertyMap);
@ -171,7 +171,7 @@ public class TestKafkaRecordSink_2_5 {
return task;
}
private static class MockKafkaRecordSink_2_5 extends KafkaRecordSink_2_5 {
private static class MockKafkaRecordSink_2_6 extends KafkaRecordSink_2_6 {
final List<byte[]> dataSent = new ArrayList<>();
@SuppressWarnings("unchecked")

View File

@ -29,7 +29,7 @@
<kafka11.version>0.11.0.3</kafka11.version>
<kafka1.0.version>1.0.2</kafka1.0.version>
<kafka2.0.version>2.0.0</kafka2.0.version>
<kafka2.5.version>2.5.0</kafka2.5.version>
<kafka2.6.version>2.6.0</kafka2.6.version>
</properties>
<modules>
@ -39,14 +39,14 @@
<module>nifi-kafka-0-11-processors</module>
<module>nifi-kafka-1-0-processors</module>
<module>nifi-kafka-2-0-processors</module>
<module>nifi-kafka-2-5-processors</module>
<module>nifi-kafka-2-6-processors</module>
<module>nifi-kafka-0-8-nar</module>
<module>nifi-kafka-0-9-nar</module>
<module>nifi-kafka-0-10-nar</module>
<module>nifi-kafka-0-11-nar</module>
<module>nifi-kafka-1-0-nar</module>
<module>nifi-kafka-2-0-nar</module>
<module>nifi-kafka-2-5-nar</module>
<module>nifi-kafka-2-6-nar</module>
</modules>
<dependencyManagement>
<dependencies>
@ -82,7 +82,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-2-5-processors</artifactId>
<artifactId>nifi-kafka-2-6-processors</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>