NIFI-1728 Specifying charsets for messages sent to/received from Kafka in Kafka processor and related tests to remedy failures in Windows environments. Specifying EOF as an int instead of a byte.

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Aldrin Piri 2016-04-04 18:21:59 -04:00 committed by joewitt
parent 1c025dd4be
commit 0e9e9b36d9
3 changed files with 29 additions and 28 deletions

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
*/
class StreamScanner {
private final static byte EOF = -1;
private final static int EOF = -1;
private final InputStream is;

View File

@ -179,7 +179,7 @@ public class KafkaPublisherTest {
@Test
public void validateWithMultiByteCharacters() throws Exception {
String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
InputStream fis = new ByteArrayInputStream(data.getBytes());
InputStream fis = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
String topicName = "validateWithMultiByteCharacters";
Properties kafkaProperties = this.buildProducerProperties();
@ -192,7 +192,7 @@ public class KafkaPublisherTest {
publisher.close();
ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
String r = new String(iter.next().message());
String r = new String(iter.next().message(), StandardCharsets.UTF_8);
assertEquals(data, r);
}

View File

@ -77,18 +77,18 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes());
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message()));
assertEquals("Goodbye", new String(consumer.next().message()));
assertEquals("1", new String(consumer.next().message()));
assertEquals("2", new String(consumer.next().message()));
assertEquals("3", new String(consumer.next().message()));
assertEquals("4", new String(consumer.next().message()));
assertEquals("5", new String(consumer.next().message()));
assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@ -106,14 +106,14 @@ public class TestPutKafka {
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final String text = "Hello World\nGoodbye\n1\n2";
runner.enqueue(text.getBytes());
runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
afterClass(); // kill Kafka right before send to ensure producer fails
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS);
BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes());
BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes(StandardCharsets.UTF_8));
assertTrue(fs.get(0));
assertTrue(fs.get(1));
assertTrue(fs.get(2));
@ -148,8 +148,8 @@ public class TestPutKafka {
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Goodbye", new String(consumer.next().message()));
assertEquals("2", new String(consumer.next().message()));
assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8));
try {
consumer.next();
fail();
@ -169,7 +169,7 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
runner.enqueue(bytes);
runner.run(1);
@ -198,14 +198,14 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes());
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message()));
assertEquals("Goodbye", new String(consumer.next().message()));
assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message()));
assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@ -219,15 +219,16 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes());
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message()));
assertEquals("Goodbye", new String(consumer.next().message()));
assertEquals("I Mean IT!", new String(consumer.next().message()));
assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message()));
byte[] message = consumer.next().message();
assertEquals("Hello World", new String(message, StandardCharsets.UTF_8));
assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("I Mean IT!", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@ -241,13 +242,13 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes());
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message()));
assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message()));
assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}