diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index 3e582a6f23..46507d2c5f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -43,13 +43,6 @@ import kafka.javaapi.consumer.ConsumerConnector; public class KafkaPublisherTest { - private static final String sampleData = "The true sign of intelligence is not knowledge but imagination.\n" - + "It's not that I'm so smart, it's just that I stay with problems longer.\n" - + "The only source of knowledge is experience.\n" - + "Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.\n"; - - private static final String sampleData2 = "foo|bar|baz"; - private static EmbeddedKafka kafkaLocal; private static EmbeddedKafkaProducerHelper producerHelper; @@ -69,7 +62,7 @@ public class KafkaPublisherTest { @Test public void validateSuccessfulSendAsWhole() throws Exception { - InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + InputStream fis = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); @@ -93,7 +86,8 @@ public class KafkaPublisherTest { @Test public void validateSuccessfulSendAsDelimited() throws Exception { - InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + InputStream fis = new ByteArrayInputStream( + "Hello Kafka 1\nHello Kafka 2\nHello Kafka 3\nHello Kafka 4\n".getBytes(StandardCharsets.UTF_8)); String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); @@ -117,34 +111,10 @@ public class KafkaPublisherTest { } } - @Test - public void validateSuccessfulSendAsDelimited2() throws Exception { - InputStream fis = new ByteArrayInputStream(sampleData2.getBytes(StandardCharsets.UTF_8)); - String topicName = "validateSuccessfulSendAsDelimited2"; - - Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|".getBytes(StandardCharsets.UTF_8)); - - publisher.publish(messageContext, fis, null, 2000); - publisher.close(); - - ConsumerIterator iter = this.buildConsumer(topicName); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - } - @Test public void validateSuccessfulReSendOfFailedSegments() throws Exception { - InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + InputStream fis = new ByteArrayInputStream( + "Hello Kafka 1\nHello Kafka 2\nHello Kafka 3\nHello Kafka 4\n".getBytes(StandardCharsets.UTF_8)); String topicName = "validateSuccessfulReSendOfFailedSegments"; Properties kafkaProperties = this.buildProducerProperties(); @@ -160,8 +130,8 @@ public class KafkaPublisherTest { ConsumerIterator iter = this.buildConsumer(topicName); String m1 = new String(iter.next().message()); String m2 = new String(iter.next().message()); - assertEquals("It's not that I'm so smart, it's just that I stay with problems longer.", m1); - assertEquals("Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.", m2); + assertEquals("Hello Kafka 2", m1); + assertEquals("Hello Kafka 4", m2); try { iter.next(); fail();