diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java index 9967404483..d597a0589b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java @@ -46,6 +46,9 @@ final class SplittableMessageContext { * - "(\\W)\\Z". */ SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) { + if (topicName == null || topicName.trim().length() == 0){ + throw new IllegalArgumentException("'topicName' must not be null or empty"); + } this.topicName = topicName; this.keyBytes = keyBytes; this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z"; @@ -60,20 +63,25 @@ final class SplittableMessageContext { } /** - * + * Will set failed segments from an array of integers */ void setFailedSegments(int... failedSegments) { - this.failedSegments = new BitSet(); - for (int failedSegment : failedSegments) { - this.failedSegments.set(failedSegment); + if (failedSegments != null) { + this.failedSegments = new BitSet(); + for (int failedSegment : failedSegments) { + this.failedSegments.set(failedSegment); + } } } /** - * + * Will set failed segments from an array of bytes that will be used to + * construct the final {@link BitSet} representing failed segments */ void setFailedSegmentsAsByteArray(byte[] failedSegments) { - this.failedSegments = BitSet.valueOf(failedSegments); + if (failedSegments != null) { + this.failedSegments = BitSet.valueOf(failedSegments); + } } /** @@ -102,7 +110,7 @@ final class SplittableMessageContext { * Returns the key bytes as String */ String getKeyBytesAsString() { - return new String(this.keyBytes); + return this.keyBytes != null ? new String(this.keyBytes) : null; } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java new file mode 100644 index 0000000000..b12464a327 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.nio.charset.StandardCharsets; + +import org.junit.Test; + +public class SplittableMessageContextTest { + + @Test(expected = IllegalArgumentException.class) + public void failNullEmptyTopic() { + new SplittableMessageContext(null, null, null); + } + + @Test + public void validateFullSetting() { + SplittableMessageContext ctx = new SplittableMessageContext("foo", "hello".getBytes(), "\n"); + ctx.setFailedSegments(1, 3, 6); + assertEquals("\n", ctx.getDelimiterPattern()); + assertEquals("hello", new String(ctx.getKeyBytes(), StandardCharsets.UTF_8)); + assertEquals("foo", ctx.getTopicName()); + assertEquals("topic: 'foo'; delimiter: '\n'", ctx.toString()); + } + + + @Test + public void validateToString() { + SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); + assertEquals("topic: 'foo'; delimiter: '(\\W)\\Z'", ctx.toString()); + } + + @Test + public void validateNoNPEandNoSideffectsOnSetsGets() { + SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); + ctx.setFailedSegments(null); + assertNull(ctx.getFailedSegments()); + + ctx.setFailedSegmentsAsByteArray(null); + assertNull(ctx.getFailedSegments()); + + assertEquals("(\\W)\\Z", ctx.getDelimiterPattern());; + assertNull(ctx.getKeyBytes()); + assertNull(ctx.getKeyBytesAsString()); + assertEquals("foo", ctx.getTopicName()); + } +}