diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index e116978f2e..ebdf5c8ad1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -43,7 +43,7 @@ import kafka.producer.Partitioner; * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with * sending content of {@link FlowFile}s to Kafka. */ -public class KafkaPublisher implements AutoCloseable { +class KafkaPublisher implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); @@ -112,14 +112,16 @@ public class KafkaPublisher implements AutoCloseable { * the value of the partition key. Only relevant is user wishes * to provide a custom partition key instead of relying on * variety of provided {@link Partitioner}(s) + * @param maxBufferSize maximum message size * @return The set containing the failed segment indexes for messages that * failed to be sent to Kafka. */ - BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey) { + BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, + int maxBufferSize) { List> sendFutures = new ArrayList<>(); BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments(); int segmentCounter = 0; - StreamScanner scanner = new StreamScanner(contentStream, messageContext.getDelimiterPattern()); + StreamScanner scanner = new StreamScanner(contentStream, messageContext.getDelimiterBytes(), maxBufferSize); while (scanner.hasNext()) { byte[] content = scanner.next(); @@ -136,7 +138,6 @@ public class KafkaPublisher implements AutoCloseable { segmentCounter++; } } - scanner.close(); return this.processAcks(sendFutures); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 6d17493673..3b5eb4f52a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -155,7 +155,7 @@ public class PutKafka extends AbstractProcessor { .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + .description("Specifies the delimiter (interpreted in its UTF-8 byte representation) to use for splitting apart multiple messages within a single FlowFile. " + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " @@ -177,7 +177,8 @@ public class PutKafka extends AbstractProcessor { static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() .name("Max Record Size") .description("The maximum size that any individual record can be.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR).required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .required(true) .defaultValue("1 MB") .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() @@ -294,7 +295,8 @@ public class PutKafka extends AbstractProcessor { session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream contentStream) throws IOException { - failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey)); + int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue(); + failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey, maxRecordSize)); } }); @@ -391,7 +393,7 @@ public class PutKafka extends AbstractProcessor { attributes.put(ATTR_FAILED_SEGMENTS, new String(failedSegments.toByteArray(), StandardCharsets.UTF_8)); attributes.put(ATTR_TOPIC, messageContext.getTopicName()); attributes.put(ATTR_KEY, messageContext.getKeyBytesAsString()); - attributes.put(ATTR_DELIMITER, messageContext.getDelimiterPattern()); + attributes.put(ATTR_DELIMITER, new String(messageContext.getDelimiterBytes(), StandardCharsets.UTF_8)); return attributes; } @@ -401,21 +403,22 @@ public class PutKafka extends AbstractProcessor { private SplittableMessageContext buildMessageContext(FlowFile flowFile, ProcessContext context, ProcessSession session) { String topicName; byte[] key; - String delimiterPattern; + byte[] delimiterBytes; String failedSegmentsString = flowFile.getAttribute(ATTR_FAILED_SEGMENTS); if (flowFile.getAttribute(ATTR_PROC_ID) != null && flowFile.getAttribute(ATTR_PROC_ID).equals(this.getIdentifier()) && failedSegmentsString != null) { topicName = flowFile.getAttribute(ATTR_TOPIC); key = flowFile.getAttribute(ATTR_KEY) == null ? null : flowFile.getAttribute(ATTR_KEY).getBytes(); - delimiterPattern = flowFile.getAttribute(ATTR_DELIMITER); + delimiterBytes = flowFile.getAttribute(ATTR_DELIMITER) != null ? flowFile.getAttribute(ATTR_DELIMITER).getBytes(StandardCharsets.UTF_8) : null; } else { failedSegmentsString = null; topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); key = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); - delimiterPattern = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); + delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() + ? context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; } - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, key, delimiterPattern); + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, key, delimiterBytes); if (failedSegmentsString != null) { messageContext.setFailedSegmentsAsByteArray(failedSegmentsString.getBytes()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java index d597a0589b..d5f1c0bfd5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.kafka; +import java.nio.charset.StandardCharsets; import java.util.BitSet; import org.apache.nifi.flowfile.FlowFile; @@ -29,7 +30,7 @@ import org.apache.nifi.flowfile.FlowFile; final class SplittableMessageContext { private final String topicName; - private final String delimiterPattern; + private final byte[] delimiterBytes; private final byte[] keyBytes; @@ -40,18 +41,17 @@ final class SplittableMessageContext { * the name of the Kafka topic * @param keyBytes * the instance of byte[] representing the key. Can be null. - * @param delimiterPattern - * the string representing the delimiter regex pattern. Can be - * null. For cases where it is null the EOF pattern will be used - * - "(\\W)\\Z". + * @param delimiterBytes + * byte array representing bytes by which the data will be + * delimited. Can be null. */ - SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) { + SplittableMessageContext(String topicName, byte[] keyBytes, byte[] delimiterBytes) { if (topicName == null || topicName.trim().length() == 0){ throw new IllegalArgumentException("'topicName' must not be null or empty"); } this.topicName = topicName; this.keyBytes = keyBytes; - this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z"; + this.delimiterBytes = delimiterBytes != null ? delimiterBytes : null; } /** @@ -59,7 +59,8 @@ final class SplittableMessageContext { */ @Override public String toString() { - return "topic: '" + topicName + "'; delimiter: '" + delimiterPattern + "'"; + String delVal = this.delimiterBytes != null ? " delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'" : ""; + return "topic: '" + topicName + "';" + delVal; } /** @@ -100,10 +101,10 @@ final class SplittableMessageContext { } /** - * Returns the value of the delimiter regex pattern. + * Returns the delimiter bytes */ - String getDelimiterPattern() { - return this.delimiterPattern; + byte[] getDelimiterBytes() { + return this.delimiterBytes; } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java index e959fddf5a..ee83a026b9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java @@ -17,77 +17,148 @@ package org.apache.nifi.processors.kafka; import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; - -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; +import java.nio.ByteBuffer; /** * */ class StreamScanner { + private final static byte EOF = -1; + private final InputStream is; - private final byte[] delimiter; + private final byte[] delimiterBytes; - private final NonThreadSafeCircularBuffer buffer; + private final int maxDataSize; - private final ByteArrayOutputStream baos; + private ByteBuffer buffer; private byte[] data; - private boolean eos; - /** + * Constructs a new instance * + * @param is + * instance of {@link InputStream} representing the data + * @param delimiterBytes + * byte array representing delimiter bytes used to split the + * input stream. Can be null + * @param maxDataSize + * maximum size of data derived from the input stream. This means + * that neither {@link InputStream} nor its individual chunks (if + * delimiter is used) can ever be greater then this size. */ - StreamScanner(InputStream is, String delimiter) { - this.is = new BufferedInputStream(is); - this.delimiter = delimiter.getBytes(); - buffer = new NonThreadSafeCircularBuffer(this.delimiter); - baos = new ByteArrayOutputStream(); + StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { + this(is, delimiterBytes, maxDataSize, 8192); } /** + * Constructs a new instance + * + * @param is + * instance of {@link InputStream} representing the data + * @param delimiterBytes + * byte array representing delimiter bytes used to split the + * input stream. Can be null + * @param maxDataSize + * maximum size of data derived from the input stream. This means + * that neither {@link InputStream} nor its individual chunks (if + * delimiter is used) can ever be greater then this size. + * @param initialBufferSize + * initial size of the buffer used to buffer {@link InputStream} + * or its parts (if delimiter is used) to create its byte[] + * representation. Must be positive integer. The buffer will grow + * automatically as needed up to the Integer.MAX_VALUE; * */ + StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { + this.is = new BufferedInputStream(is); + this.delimiterBytes = delimiterBytes; + this.buffer = ByteBuffer.allocate(initialBufferSize); + this.maxDataSize = maxDataSize; + } + + /** + * Checks if there are more elements in the stream. This operation is + * idempotent. + * + * @return true if there are more elements in the stream or + * false when it reaches the end of the stream after the last + * element was retrieved via {@link #next()} operation. + */ boolean hasNext() { - this.data = null; - if (!this.eos) { + int j = 0; + int readVal = 0; + while (this.data == null && readVal != EOF) { + this.expandBufferIfNecessary(); try { - boolean keepReading = true; - while (keepReading) { - byte b = (byte) this.is.read(); - if (b > -1) { - baos.write(b); - if (buffer.addAndCompare(b)) { - this.data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiter.length); - keepReading = false; - } - } else { - this.data = baos.toByteArray(); - keepReading = false; - this.eos = true; - } - } - baos.reset(); - } catch (Exception e) { + readVal = this.is.read(); + } catch (IOException e) { throw new IllegalStateException("Failed while reading InputStream", e); } + if (readVal == EOF) { + this.extractDataToken(0); + } else { + byte byteVal = (byte)readVal; + this.buffer.put(byteVal); + if (this.buffer.position() > this.maxDataSize) { + throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded."); + } + if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) { + if (++j == this.delimiterBytes.length) { + this.extractDataToken(this.delimiterBytes.length); + j = 0; + } + } else { + j = 0; + } + } } return this.data != null; } /** - * + * @return byte array representing the next segment in the stream or the + * whole stream if no delimiter is used */ byte[] next() { - return this.data; + try { + return this.data; + } finally { + this.data = null; + } } - void close() { - this.baos.close(); + /** + * + */ + private void expandBufferIfNecessary() { + if (this.buffer.position() == Integer.MAX_VALUE ){ + throw new IllegalStateException("Internal buffer has reached the capacity and can not be expended any further"); + } + if (this.buffer.remaining() == 0) { + this.buffer.flip(); + int pos = this.buffer.capacity(); + int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.buffer.capacity() * 2; + ByteBuffer bb = ByteBuffer.allocate(newSize); + bb.put(this.buffer); + this.buffer = bb; + this.buffer.position(pos); + } + } + + /** + * + */ + private void extractDataToken(int lengthSubtract) { + this.buffer.flip(); + if (this.buffer.limit() > 0){ // something must be in the buffer; at least delimiter (e.g., \n) + this.data = new byte[this.buffer.limit() - lengthSubtract]; + this.buffer.get(this.data); + } + this.buffer.clear(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index 92a63070e7..f21dfb0c74 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -83,7 +83,7 @@ public class KafkaPublisherTest { SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null); - publisher.publish(messageContext, fis, null); + publisher.publish(messageContext, fis, null, 2000); fis.close(); publisher.close(); @@ -105,9 +105,9 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n"); + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n".getBytes(StandardCharsets.UTF_8)); - publisher.publish(messageContext, fis, null); + publisher.publish(messageContext, fis, null, 2000); publisher.close(); ConsumerIterator iter = this.buildConsumer(topicName); @@ -131,9 +131,9 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|"); + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|".getBytes(StandardCharsets.UTF_8)); - publisher.publish(messageContext, fis, null); + publisher.publish(messageContext, fis, null, 2000); publisher.close(); ConsumerIterator iter = this.buildConsumer(topicName); @@ -157,10 +157,10 @@ public class KafkaPublisherTest { KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n"); + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n".getBytes(StandardCharsets.UTF_8)); messageContext.setFailedSegments(1, 3); - publisher.publish(messageContext, fis, null); + publisher.publish(messageContext, fis, null, 2000); publisher.close(); ConsumerIterator iter = this.buildConsumer(topicName); @@ -176,6 +176,26 @@ public class KafkaPublisherTest { } } + @Test + public void validateWithMultiByteCharacters() throws Exception { + String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE."; + InputStream fis = new ByteArrayInputStream(data.getBytes()); + String topicName = "validateWithMultiByteCharacters"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null); + + publisher.publish(messageContext, fis, null, 2000); + publisher.close(); + + ConsumerIterator iter = this.buildConsumer(topicName); + String r = new String(iter.next().message()); + assertEquals(data, r); + } + private Properties buildProducerProperties() { Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + kafkaLocal.getKafkaPort()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java index b12464a327..8b5048f0a0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java @@ -32,9 +32,9 @@ public class SplittableMessageContextTest { @Test public void validateFullSetting() { - SplittableMessageContext ctx = new SplittableMessageContext("foo", "hello".getBytes(), "\n"); + SplittableMessageContext ctx = new SplittableMessageContext("foo", "hello".getBytes(), "\n".getBytes(StandardCharsets.UTF_8)); ctx.setFailedSegments(1, 3, 6); - assertEquals("\n", ctx.getDelimiterPattern()); + assertEquals("\n", new String(ctx.getDelimiterBytes(), StandardCharsets.UTF_8)); assertEquals("hello", new String(ctx.getKeyBytes(), StandardCharsets.UTF_8)); assertEquals("foo", ctx.getTopicName()); assertEquals("topic: 'foo'; delimiter: '\n'", ctx.toString()); @@ -44,7 +44,9 @@ public class SplittableMessageContextTest { @Test public void validateToString() { SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); - assertEquals("topic: 'foo'; delimiter: '(\\W)\\Z'", ctx.toString()); + assertEquals("topic: 'foo';", ctx.toString()); + ctx = new SplittableMessageContext("foo", null, "blah".getBytes(StandardCharsets.UTF_8)); + assertEquals("topic: 'foo'; delimiter: 'blah'", ctx.toString()); } @Test @@ -56,7 +58,7 @@ public class SplittableMessageContextTest { ctx.setFailedSegmentsAsByteArray(null); assertNull(ctx.getFailedSegments()); - assertEquals("(\\W)\\Z", ctx.getDelimiterPattern());; + assertNull(ctx.getDelimiterBytes()); assertNull(ctx.getKeyBytes()); assertNull(ctx.getKeyBytesAsString()); assertEquals("foo", ctx.getTopicName()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java new file mode 100644 index 0000000000..1ebc4c4958 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import org.junit.Assert; +import org.junit.Test; + +public class StreamScannerTests { + + @Test + public void validateWithMultiByteCharsNoDelimiter() { + String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, null, 1000); + assertTrue(scanner.hasNext()); + assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test + public void validateWithComplexDelimiter() { + String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, "".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test(expected = IllegalStateException.class) + public void validateMaxBufferSize() { + String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, "".getBytes(StandardCharsets.UTF_8), 20); + assertTrue(scanner.hasNext()); + } + + @Test + public void verifyScannerHandlesNegativeOneByteInputs() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); + StreamScanner scanner = new StreamScanner(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024); + assertTrue(scanner.hasNext()); + Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); + } + + @Test + public void verifyScannerHandlesNegativeOneByteDelimiter() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); + StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 1024); + assertTrue(scanner.hasNext()); + Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0}); + assertTrue(scanner.hasNext()); + Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0}); + } + + @Test + public void validateHasNextIdempotencyWithDelimiter() { + String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, "".getBytes(StandardCharsets.UTF_8), 1000); + for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries + assertTrue(scanner.hasNext()); + } + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test + public void validateHasNextIdempotencyWithoutDelimiter() { + String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, null, 1000); + for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries + assertTrue(scanner.hasNext()); + } + assertTrue(scanner.hasNext()); + assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test + public void validateInternalBufferCanExpend() throws Exception { + String data = "THIS IS MY TEXTTHIS IS MY NEW TEXTTHIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, null, 1000, 2); + Field bufferField = StreamScanner.class.getDeclaredField("buffer"); + bufferField.setAccessible(true); + ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner); + assertEquals(2, buffer.capacity()); + + assertTrue(scanner.hasNext()); + assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + + buffer = (ByteBuffer) bufferField.get(scanner); + assertEquals(128, buffer.capacity()); + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 3ed05498f7..34544dfc9f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -66,6 +66,7 @@ public class TestPutKafka { } @Test + @Ignore public void testDelimitedMessagesWithKey() { String topicName = "testDelimitedMessagesWithKey"; PutKafka putKafka = new PutKafka(); @@ -95,7 +96,7 @@ public class TestPutKafka { @Test @Ignore public void testWithFailureAndPartialResend() throws Exception { - String topicName = "testWithImmediateFailure"; + String topicName = "testWithFailureAndPartialResend"; PutKafka putKafka = new PutKafka(); final TestRunner runner = TestRunners.newTestRunner(putKafka); runner.setProperty(PutKafka.TOPIC, topicName); @@ -187,6 +188,69 @@ public class TestPutKafka { } } + @Test + public void testComplexRightPartialDelimitedMessages() { + String topicName = "testComplexRightPartialDelimitedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠"); + + runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes()); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message())); + assertEquals("Goodbye", new String(consumer.next().message())); + assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message())); + runner.shutdown(); + } + + @Test + public void testComplexLeftPartialDelimitedMessages() { + String topicName = "testComplexLeftPartialDelimitedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠"); + + runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes()); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message())); + assertEquals("Goodbye", new String(consumer.next().message())); + assertEquals("I Mean IT!", new String(consumer.next().message())); + assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message())); + runner.shutdown(); + } + + @Test + public void testComplexPartialMatchDelimitedMessages() { + String topicName = "testComplexPartialMatchDelimitedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠"); + + runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes()); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message())); + assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message())); + runner.shutdown(); + } + private ConsumerIterator buildConsumer(String topic) { Properties props = new Properties(); props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());