diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java index f935270cea..c0cde3c730 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java @@ -192,7 +192,8 @@ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable { consumerRecord.timestamp(), recordHeaders, consumerRecord.key(), - consumerRecord.value() + consumerRecord.value(), + 1 ); } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/BundleKey.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/BundleKey.java index d8cdf8ee5b..55df77ca3e 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/BundleKey.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/BundleKey.java @@ -16,11 +16,10 @@ */ package org.apache.nifi.kafka.processors.consumer.bundle; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary; import org.apache.nifi.kafka.service.api.header.RecordHeader; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -69,25 +68,21 @@ public class BundleKey { @Override public boolean equals(Object o) { - if (this == o) return true; - - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } final BundleKey bundleKey = (BundleKey) o; - - return new EqualsBuilder() - .append(topicPartition, bundleKey.topicPartition) - .append(headersFiltered, bundleKey.headersFiltered) - .append(messageKey, bundleKey.messageKey) - .isEquals(); + return Objects.equals(topicPartition, bundleKey.topicPartition) + && Objects.equals(headersFiltered, bundleKey.headersFiltered) + && Arrays.equals(messageKey, bundleKey.messageKey); } @Override public int hashCode() { - return new HashCodeBuilder(17, 37) - .append(topicPartition) - .append(headersFiltered) - .append(messageKey) - .toHashCode(); + return Objects.hash(topicPartition, headersFiltered, Arrays.hashCode(messageKey)); } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java index b8b3f35333..a0144aa48e 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java @@ -66,7 +66,7 @@ public class ByteRecordBundler { key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8))); key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_COUNT, Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8))); return new ByteRecord(topicPartition.getTopic(), topicPartition.getPartition(), - value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData()); + value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData(), value.getCount()); } private void update(final Map bundles, final ByteRecord byteRecord) { diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java index ac3e279bc0..9d0d8125af 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java @@ -70,7 +70,7 @@ public class FlowFileStreamKafkaMessageConverter implements KafkaMessageConverte final String transitUri = String.format(TRANSIT_URI_FORMAT, consumerRecord.getTopic(), consumerRecord.getPartition()); provenanceReporter.receive(flowFile, transitUri); - session.adjustCounter("Records Received from " + consumerRecord.getTopic(), 1, false); + session.adjustCounter("Records Received from " + consumerRecord.getTopic(), consumerRecord.getBundledCount(), false); session.transfer(flowFile, ConsumeKafka.SUCCESS); offsetTracker.update(consumerRecord); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/record/ByteRecord.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/record/ByteRecord.java index 8c4f915b5d..4d57d66151 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/record/ByteRecord.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/record/ByteRecord.java @@ -26,19 +26,15 @@ import java.util.Optional; * Byte Record translation of Kafka Record with byte arrays for key and value properties */ public class ByteRecord { + private final String topic; - private final int partition; - private final long offset; - private final long timestamp; - private final List headers; - private final byte[] key; - private final byte[] value; + private final long bundledCount; public ByteRecord( final String topic, @@ -47,7 +43,8 @@ public class ByteRecord { final long timestamp, final List headers, final byte[] key, - final byte[] value + final byte[] value, + final long bundledCount ) { this.topic = Objects.requireNonNull(topic, "Topic required"); this.partition = partition; @@ -56,6 +53,7 @@ public class ByteRecord { this.headers = Objects.requireNonNull(headers, "Headers required"); this.key = key; this.value = Objects.requireNonNull(value, "Value required"); + this.bundledCount = bundledCount; } public String getTopic() { @@ -85,4 +83,8 @@ public class ByteRecord { public List getHeaders() { return headers; } + + public long getBundledCount() { + return bundledCount; + } }