From a43135ce0cf7a21ed4186ac530e0912e9ca9196e Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 20 Sep 2024 19:37:42 -0500 Subject: [PATCH] NIFI-13783 Removed commons-codec from Kafka Processors This closes #9295 - Replaced Commons Codec Hex with Java HexFormat Signed-off-by: Joseph Witt --- .../nifi/kafka/processors/ConsumeKafkaKeyAttributeIT.java | 4 ++-- .../nifi/kafka/processors/PublishKafkaDelimitedIT.java | 8 +++++--- .../publish/additional/PublishKafkaWrapperX3IT.java | 4 ++-- .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 4 ---- .../apache/nifi/kafka/processors/common/KafkaUtils.java | 4 ++-- 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaKeyAttributeIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaKeyAttributeIT.java index da31d19b09..2250425e72 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaKeyAttributeIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaKeyAttributeIT.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.kafka.processors; -import org.apache.commons.codec.binary.Hex; import org.apache.kafka.common.header.Header; import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; @@ -33,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HexFormat; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -63,7 +63,7 @@ public class ConsumeKafkaKeyAttributeIT extends AbstractConsumeKafkaIT { public static Stream permutations() { return Stream.of( Arguments.arguments(KeyEncoding.UTF8, RECORD_KEY, RECORD_KEY), - Arguments.arguments(KeyEncoding.HEX, RECORD_KEY, Hex.encodeHexString(RECORD_KEY.getBytes(StandardCharsets.UTF_8))), + Arguments.arguments(KeyEncoding.HEX, RECORD_KEY, HexFormat.of().formatHex(RECORD_KEY.getBytes(StandardCharsets.UTF_8))), Arguments.arguments(KeyEncoding.DO_NOT_ADD, RECORD_KEY, null), Arguments.arguments(KeyEncoding.UTF8, null, null) ); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java index 80de86053d..a2eee00904 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.kafka.processors; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.header.Header; @@ -43,20 +42,23 @@ public class PublishKafkaDelimitedIT extends AbstractPublishKafkaIT { private static final int TEST_RECORD_COUNT = 3; + private static final String DEMARCATOR = "xx"; + @Test public void test1ProduceOneFlowFile() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class); runner.setValidateExpressionUsage(false); runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "xx"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, DEMARCATOR); runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); final Map attributes = new HashMap<>(); attributes.put("a1", "valueA1"); attributes.put("b1", "valueB1"); - runner.enqueue(StringUtils.repeat(TEST_RECORD_VALUE + "xx", TEST_RECORD_COUNT), attributes); + final String value = TEST_RECORD_VALUE + DEMARCATOR + TEST_RECORD_VALUE + DEMARCATOR + TEST_RECORD_VALUE; + runner.enqueue(value, attributes); runner.run(1); runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java index 00fa62341e..406451f3ef 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java @@ -17,7 +17,6 @@ package org.apache.nifi.kafka.processors.publish.additional; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -38,6 +37,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HexFormat; import java.util.List; import java.util.Map; import java.util.Objects; @@ -85,7 +85,7 @@ public class PublishKafkaWrapperX3IT extends AbstractPublishKafkaIT { final List
headers = Arrays.asList(record.headers().toArray()); assertTrue(headers.isEmpty()); // kafka record key - assertEquals("411b0a140b39581341", Hex.encodeHexString(record.key().getBytes(StandardCharsets.UTF_8))); + assertEquals("411b0a140b39581341", HexFormat.of().formatHex(record.key().getBytes(StandardCharsets.UTF_8))); // kafka record value final ObjectNode kafkaValue = (ObjectNode) objectMapper.readTree(record.value()); assertNotNull(kafkaValue); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index 3ad9bcff7b..b726072130 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -53,9 +53,5 @@ org.apache.commons commons-lang3 - - commons-codec - commons-codec - diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java index d5e008ff29..0f90bb1063 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.kafka.processors.common; -import org.apache.commons.codec.binary.Hex; import org.apache.nifi.kafka.service.api.header.RecordHeader; import org.apache.nifi.kafka.service.api.record.ByteRecord; import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; @@ -26,6 +25,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.HexFormat; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -62,7 +62,7 @@ public class KafkaUtils { if (key == null) { keyAttributeValue = null; } else if (KeyEncoding.HEX.equals(keyEncoding)) { - keyAttributeValue = Hex.encodeHexString(key); + keyAttributeValue = HexFormat.of().formatHex(key); } else if (KeyEncoding.UTF8.equals(keyEncoding)) { keyAttributeValue = new String(key, StandardCharsets.UTF_8); } else {