NIFI-13783 Removed commons-codec from Kafka Processors

This closes #9295

- Replaced Commons Codec Hex with Java HexFormat

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2024-09-20 19:37:42 -05:00 committed by Joseph Witt
parent 426b8feaa3
commit a43135ce0c
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 11 additions and 13 deletions

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.kafka.processors; package org.apache.nifi.kafka.processors;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
@ -33,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.HexFormat;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -63,7 +63,7 @@ public class ConsumeKafkaKeyAttributeIT extends AbstractConsumeKafkaIT {
public static Stream<Arguments> permutations() { public static Stream<Arguments> permutations() {
return Stream.of( return Stream.of(
Arguments.arguments(KeyEncoding.UTF8, RECORD_KEY, RECORD_KEY), Arguments.arguments(KeyEncoding.UTF8, RECORD_KEY, RECORD_KEY),
Arguments.arguments(KeyEncoding.HEX, RECORD_KEY, Hex.encodeHexString(RECORD_KEY.getBytes(StandardCharsets.UTF_8))), Arguments.arguments(KeyEncoding.HEX, RECORD_KEY, HexFormat.of().formatHex(RECORD_KEY.getBytes(StandardCharsets.UTF_8))),
Arguments.arguments(KeyEncoding.DO_NOT_ADD, RECORD_KEY, null), Arguments.arguments(KeyEncoding.DO_NOT_ADD, RECORD_KEY, null),
Arguments.arguments(KeyEncoding.UTF8, null, null) Arguments.arguments(KeyEncoding.UTF8, null, null)
); );

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.kafka.processors; package org.apache.nifi.kafka.processors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
@ -43,20 +42,23 @@ public class PublishKafkaDelimitedIT extends AbstractPublishKafkaIT {
private static final int TEST_RECORD_COUNT = 3; private static final int TEST_RECORD_COUNT = 3;
private static final String DEMARCATOR = "xx";
@Test @Test
public void test1ProduceOneFlowFile() throws InitializationException { public void test1ProduceOneFlowFile() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class); final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "xx"); runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, DEMARCATOR);
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("a1", "valueA1"); attributes.put("a1", "valueA1");
attributes.put("b1", "valueB1"); attributes.put("b1", "valueB1");
runner.enqueue(StringUtils.repeat(TEST_RECORD_VALUE + "xx", TEST_RECORD_COUNT), attributes); final String value = TEST_RECORD_VALUE + DEMARCATOR + TEST_RECORD_VALUE + DEMARCATOR + TEST_RECORD_VALUE;
runner.enqueue(value, attributes);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
} }

View File

@ -17,7 +17,6 @@
package org.apache.nifi.kafka.processors.publish.additional; package org.apache.nifi.kafka.processors.publish.additional;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -38,6 +37,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HexFormat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -85,7 +85,7 @@ public class PublishKafkaWrapperX3IT extends AbstractPublishKafkaIT {
final List<Header> headers = Arrays.asList(record.headers().toArray()); final List<Header> headers = Arrays.asList(record.headers().toArray());
assertTrue(headers.isEmpty()); assertTrue(headers.isEmpty());
// kafka record key // kafka record key
assertEquals("411b0a140b39581341", Hex.encodeHexString(record.key().getBytes(StandardCharsets.UTF_8))); assertEquals("411b0a140b39581341", HexFormat.of().formatHex(record.key().getBytes(StandardCharsets.UTF_8)));
// kafka record value // kafka record value
final ObjectNode kafkaValue = (ObjectNode) objectMapper.readTree(record.value()); final ObjectNode kafkaValue = (ObjectNode) objectMapper.readTree(record.value());
assertNotNull(kafkaValue); assertNotNull(kafkaValue);

View File

@ -53,9 +53,5 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.kafka.processors.common; package org.apache.nifi.kafka.processors.common;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.kafka.service.api.header.RecordHeader; import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord; import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
@ -26,6 +25,7 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HexFormat;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -62,7 +62,7 @@ public class KafkaUtils {
if (key == null) { if (key == null) {
keyAttributeValue = null; keyAttributeValue = null;
} else if (KeyEncoding.HEX.equals(keyEncoding)) { } else if (KeyEncoding.HEX.equals(keyEncoding)) {
keyAttributeValue = Hex.encodeHexString(key); keyAttributeValue = HexFormat.of().formatHex(key);
} else if (KeyEncoding.UTF8.equals(keyEncoding)) { } else if (KeyEncoding.UTF8.equals(keyEncoding)) {
keyAttributeValue = new String(key, StandardCharsets.UTF_8); keyAttributeValue = new String(key, StandardCharsets.UTF_8);
} else { } else {