NIFI-8357: Updated Kafka 2.6 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions

This closes #4926.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Payne 2021-03-23 13:53:26 -04:00 committed by Peter Turcsanyi
parent 2f08d1f466
commit 74ea3840ac
3 changed files with 184 additions and 47 deletions

View File

@ -164,6 +164,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
}
public List<TopicPartition> getAssignedPartitions() {
return null;
}
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.

View File

@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@ -66,6 +67,7 @@ public class ConsumerPool implements Closeable {
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
private final Queue<List<TopicPartition>> availableTopicPartitions = new LinkedBlockingQueue<>();
/**
* Creates a pool of KafkaConsumer objects that will grow up to the maximum
@ -119,7 +121,7 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
enqueueAssignedPartitions(partitionsToConsume);
}
public ConsumerPool(
@ -154,7 +156,7 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
enqueueAssignedPartitions(partitionsToConsume);
}
public ConsumerPool(
@ -190,7 +192,7 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
enqueueAssignedPartitions(partitionsToConsume);
}
public ConsumerPool(
@ -226,7 +228,7 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
enqueueAssignedPartitions(partitionsToConsume);
}
public int getPartitionCount() {
@ -262,64 +264,95 @@ public class ConsumerPool implements Closeable {
* @return consumer to use or null if not available or necessary
*/
public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
// If there are any partition assignments that do not have leases in our pool, create the leases and add them to the pool.
// This is not necessary for us to handle if using automatic subscriptions because the Kafka protocol will ensure that each consumer
// has the appropriate partitions. However, if we are using explicit assignment, it's important to create these leases and add them
// to our pool in order to avoid starvation. E.g., if we have only a single concurrent task and 5 partitions assigned, we cannot simply
// wait until pooledLeases.poll() returns null to create a new ConsumerLease, as doing so may result in constantly pulling from only a
// single partition (since we'd get a Lease for Partition 1, then use it, and put it back in the pool).
recreateAssignedConsumers();
SimpleConsumerLease lease = pooledLeases.poll();
if (lease == null) {
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
/**
* For now return a new consumer lease. But we could later elect to
* have this return null if we determine the broker indicates that
* the lag time on all topics being monitored is sufficiently low.
* For now we should encourage conservative use of threads because
* having too many means we'll have at best useless threads sitting
* around doing frequent network calls and at worst having consumers
* sitting idle which could prompt excessive rebalances.
*/
lease = new SimpleConsumerLease(consumer);
if (partitionsToConsume == null) {
// This subscription tightly couples the lease to the given
// consumer. They cannot be separated from then on.
if (topics != null) {
consumer.subscribe(topics, lease);
} else {
consumer.subscribe(topicPattern, lease);
}
} else {
logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
lease = createConsumerLease();
if (lease == null) {
return null;
}
}
lease.setProcessSession(session, processContext);
leasesObtainedCountRef.incrementAndGet();
return lease;
}
private SimpleConsumerLease createConsumerLease(final int partition) {
private void recreateAssignedConsumers() {
List<TopicPartition> topicPartitions;
while ((topicPartitions = availableTopicPartitions.poll()) != null) {
final SimpleConsumerLease simpleConsumerLease = createConsumerLease(topicPartitions);
pooledLeases.add(simpleConsumerLease);
}
}
private SimpleConsumerLease createConsumerLease() {
if (partitionsToConsume != null) {
logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
return null;
}
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
/*
* For now return a new consumer lease. But we could later elect to
* have this return null if we determine the broker indicates that
* the lag time on all topics being monitored is sufficiently low.
* For now we should encourage conservative use of threads because
* having too many means we'll have at best useless threads sitting
* around doing frequent network calls and at worst having consumers
* sitting idle which could prompt excessive rebalances.
*/
final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, null);
// This subscription tightly couples the lease to the given
// consumer. They cannot be separated from then on.
if (topics == null) {
consumer.subscribe(topicPattern, lease);
} else {
consumer.subscribe(topics, lease);
}
return lease;
}
private SimpleConsumerLease createConsumerLease(final List<TopicPartition> topicPartitions) {
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
consumer.assign(topicPartitions);
final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, topicPartitions);
return lease;
}
private void enqueueAssignedPartitions(final int[] partitionsToConsume) {
if (partitionsToConsume == null) {
return;
}
for (final int partition : partitionsToConsume) {
final List<TopicPartition> topicPartitions = createTopicPartitions(partition);
availableTopicPartitions.offer(topicPartitions);
}
}
private List<TopicPartition> createTopicPartitions(final int partition) {
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (final String topic : topics) {
final TopicPartition topicPartition = new TopicPartition(topic, partition);
topicPartitions.add(topicPartition);
}
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
consumer.assign(topicPartitions);
final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
return lease;
}
private void enqueueLeases(final int[] partitionsToConsume) {
if (partitionsToConsume == null) {
return;
}
for (final int partition : partitionsToConsume) {
final SimpleConsumerLease lease = createConsumerLease(partition);
pooledLeases.add(lease);
}
return topicPartitions;
}
/**
@ -371,16 +404,17 @@ public class ConsumerPool implements Closeable {
}
private class SimpleConsumerLease extends ConsumerLease {
private final Consumer<byte[], byte[]> consumer;
private final List<TopicPartition> assignedPartitions;
private volatile ProcessSession session;
private volatile ProcessContext processContext;
private volatile boolean closedConsumer;
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, final List<TopicPartition> assignedPartitions) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey);
this.consumer = consumer;
this.assignedPartitions = assignedPartitions;
}
void setProcessSession(final ProcessSession session, final ProcessContext context) {
@ -388,6 +422,11 @@ public class ConsumerPool implements Closeable {
this.processContext = context;
}
@Override
public List<TopicPartition> getAssignedPartitions() {
return assignedPartitions;
}
@Override
public void yield() {
if (processContext != null) {
@ -410,18 +449,27 @@ public class ConsumerPool implements Closeable {
if (closedConsumer) {
return;
}
super.close();
if (session != null) {
session.rollback();
setProcessSession(null, null);
}
if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
closedConsumer = true;
closeConsumer(consumer);
// If explicit topic/partition assignment is used, make the assignments for this Lease available again.
if (assignedPartitions != null) {
logger.debug("Adding partitions {} back to the pool", assignedPartitions);
availableTopicPartitions.offer(assignedPartitions);
}
}
}
}
static final class PoolStats {
final long consumerCreatedCount;

View File

@ -41,6 +41,9 @@ import java.util.UUID;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -157,6 +160,88 @@ public class ConsumerPoolTest {
assertEquals(1, stats.leasesObtainedCount);
}
@Test
public void testConsumerCreatedOnDemand() {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
final List<ConsumerLease> created = new ArrayList<>();
try {
for (int i = 0; i < 3; i++) {
final ConsumerLease newLease = testPool.obtainConsumer(mockSession, mockContext);
created.add(newLease);
assertNotSame(lease, newLease);
}
} finally {
created.forEach(ConsumerLease::close);
}
}
}
@Test
public void testConsumerNotCreatedOnDemandWhenUsingStaticAssignment() {
final ConsumerPool staticAssignmentPool = new ConsumerPool(
1,
null,
false,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
"utf-8",
"ssl",
"localhost",
logger,
true,
StandardCharsets.UTF_8,
null,
new int[] {1, 2, 3}) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
try (final ConsumerLease lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext)) {
ConsumerLease partition2Lease = null;
ConsumerLease partition3Lease = null;
try {
partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
assertNotSame(lease, partition2Lease);
assertEquals(1, partition2Lease.getAssignedPartitions().size());
assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition());
partition3Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
assertNotSame(lease, partition3Lease);
assertNotSame(partition2Lease, partition3Lease);
assertEquals(1, partition3Lease.getAssignedPartitions().size());
assertEquals(3, partition3Lease.getAssignedPartitions().get(0).partition());
final ConsumerLease nullLease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
assertNull(nullLease);
// Close the lease for Partition 2. We should now be able to get another Lease for Partition 2.
partition2Lease.close();
partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
assertNotNull(partition2Lease);
assertEquals(1, partition2Lease.getAssignedPartitions().size());
assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition());
assertNull(staticAssignmentPool.obtainConsumer(mockSession, mockContext));
} finally {
closeLeases(partition2Lease, partition3Lease);
}
}
}
private void closeLeases(final ConsumerLease... leases) {
for (final ConsumerLease lease : leases) {
if (lease != null) {
lease.close();
}
}
}
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));