From 0e9e9b36d94b5a69f1a38d2838117eba79b79487 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Mon, 4 Apr 2016 18:21:59 -0400 Subject: [PATCH] NIFI-1728 Specifying charsets for messages sent to/received from Kafka in Kafka processor and related tests to remedy failures in Windows environments. Specifying EOF as an int instead of a byte. Signed-off-by: joewitt --- .../nifi/processors/kafka/StreamScanner.java | 2 +- .../processors/kafka/KafkaPublisherTest.java | 4 +- .../nifi/processors/kafka/TestPutKafka.java | 51 ++++++++++--------- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java index ee83a026b9..57bbbcf9a3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; */ class StreamScanner { - private final static byte EOF = -1; + private final static int EOF = -1; private final InputStream is; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index f21dfb0c74..2abb51a9bb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -179,7 +179,7 @@ public class KafkaPublisherTest { @Test public void validateWithMultiByteCharacters() throws Exception { String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE."; - InputStream fis = new ByteArrayInputStream(data.getBytes()); + InputStream fis = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); String topicName = "validateWithMultiByteCharacters"; Properties kafkaProperties = this.buildProducerProperties(); @@ -192,7 +192,7 @@ public class KafkaPublisherTest { publisher.close(); ConsumerIterator iter = this.buildConsumer(topicName); - String r = new String(iter.next().message()); + String r = new String(iter.next().message(), StandardCharsets.UTF_8); assertEquals(data, r); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 34544dfc9f..5d75d54f46 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -77,18 +77,18 @@ public class TestPutKafka { runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes()); + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8)); runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); ConsumerIterator consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message())); - assertEquals("Goodbye", new String(consumer.next().message())); - assertEquals("1", new String(consumer.next().message())); - assertEquals("2", new String(consumer.next().message())); - assertEquals("3", new String(consumer.next().message())); - assertEquals("4", new String(consumer.next().message())); - assertEquals("5", new String(consumer.next().message())); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8)); runner.shutdown(); } @@ -106,14 +106,14 @@ public class TestPutKafka { runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); final String text = "Hello World\nGoodbye\n1\n2"; - runner.enqueue(text.getBytes()); + runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); afterClass(); // kill Kafka right before send to ensure producer fails runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS); - BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes()); + BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes(StandardCharsets.UTF_8)); assertTrue(fs.get(0)); assertTrue(fs.get(1)); assertTrue(fs.get(2)); @@ -148,8 +148,8 @@ public class TestPutKafka { ConsumerIterator consumer = this.buildConsumer(topicName); - assertEquals("Goodbye", new String(consumer.next().message())); - assertEquals("2", new String(consumer.next().message())); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8)); try { consumer.next(); fail(); @@ -169,7 +169,7 @@ public class TestPutKafka { runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8); runner.enqueue(bytes); runner.run(1); @@ -198,14 +198,14 @@ public class TestPutKafka { runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠"); - runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes()); + runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes(StandardCharsets.UTF_8)); runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); ConsumerIterator consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message())); - assertEquals("Goodbye", new String(consumer.next().message())); - assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message())); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message(), StandardCharsets.UTF_8)); runner.shutdown(); } @@ -219,15 +219,16 @@ public class TestPutKafka { runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠"); - runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes()); + runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes(StandardCharsets.UTF_8)); runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); ConsumerIterator consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message())); - assertEquals("Goodbye", new String(consumer.next().message())); - assertEquals("I Mean IT!", new String(consumer.next().message())); - assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message())); + byte[] message = consumer.next().message(); + assertEquals("Hello World", new String(message, StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("I Mean IT!", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8)); runner.shutdown(); } @@ -241,13 +242,13 @@ public class TestPutKafka { runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠"); - runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes()); + runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8)); runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); ConsumerIterator consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message())); - assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message())); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8)); runner.shutdown(); }