NIFI-13801: Fixed counter for ConsumeKafka to correctly indicate number of messages received using using a demarcator

This closes #9311.

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2024-09-24 15:24:29 -04:00 committed by Joseph Witt
parent 69b99390e6
commit 6b4daa2ec2
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 24 additions and 26 deletions

View File

@ -192,7 +192,8 @@ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
consumerRecord.timestamp(),
recordHeaders,
consumerRecord.key(),
consumerRecord.value()
consumerRecord.value(),
1
);
}
}

View File

@ -16,11 +16,10 @@
*/
package org.apache.nifi.kafka.processors.consumer.bundle;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -69,25 +68,21 @@ public class BundleKey {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final BundleKey bundleKey = (BundleKey) o;
return new EqualsBuilder()
.append(topicPartition, bundleKey.topicPartition)
.append(headersFiltered, bundleKey.headersFiltered)
.append(messageKey, bundleKey.messageKey)
.isEquals();
return Objects.equals(topicPartition, bundleKey.topicPartition)
&& Objects.equals(headersFiltered, bundleKey.headersFiltered)
&& Arrays.equals(messageKey, bundleKey.messageKey);
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(topicPartition)
.append(headersFiltered)
.append(messageKey)
.toHashCode();
return Objects.hash(topicPartition, headersFiltered, Arrays.hashCode(messageKey));
}
}

View File

@ -66,7 +66,7 @@ public class ByteRecordBundler {
key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8)));
key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_COUNT, Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8)));
return new ByteRecord(topicPartition.getTopic(), topicPartition.getPartition(),
value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData());
value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData(), value.getCount());
}
private void update(final Map<BundleKey, BundleValue> bundles, final ByteRecord byteRecord) {

View File

@ -70,7 +70,7 @@ public class FlowFileStreamKafkaMessageConverter implements KafkaMessageConverte
final String transitUri = String.format(TRANSIT_URI_FORMAT, consumerRecord.getTopic(), consumerRecord.getPartition());
provenanceReporter.receive(flowFile, transitUri);
session.adjustCounter("Records Received from " + consumerRecord.getTopic(), 1, false);
session.adjustCounter("Records Received from " + consumerRecord.getTopic(), consumerRecord.getBundledCount(), false);
session.transfer(flowFile, ConsumeKafka.SUCCESS);
offsetTracker.update(consumerRecord);

View File

@ -26,19 +26,15 @@ import java.util.Optional;
* Byte Record translation of Kafka Record with byte arrays for key and value properties
*/
public class ByteRecord {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final List<RecordHeader> headers;
private final byte[] key;
private final byte[] value;
private final long bundledCount;
public ByteRecord(
final String topic,
@ -47,7 +43,8 @@ public class ByteRecord {
final long timestamp,
final List<RecordHeader> headers,
final byte[] key,
final byte[] value
final byte[] value,
final long bundledCount
) {
this.topic = Objects.requireNonNull(topic, "Topic required");
this.partition = partition;
@ -56,6 +53,7 @@ public class ByteRecord {
this.headers = Objects.requireNonNull(headers, "Headers required");
this.key = key;
this.value = Objects.requireNonNull(value, "Value required");
this.bundledCount = bundledCount;
}
public String getTopic() {
@ -85,4 +83,8 @@ public class ByteRecord {
public List<RecordHeader> getHeaders() {
return headers;
}
public long getBundledCount() {
return bundledCount;
}
}