NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4725.
This commit is contained in:
Peter Turcsanyi 2020-12-12 13:54:56 +01:00 committed by Pierre Villard
parent ec3d5f89f0
commit 91f6b42985
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 18 additions and 16 deletions

View File

@ -48,6 +48,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {

View File

@ -24,8 +24,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.junit.Before;
@ -33,6 +31,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -43,7 +42,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -112,7 +111,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleCreateClose() throws Exception {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
@ -144,7 +143,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@ -160,7 +159,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
@ -187,7 +186,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@ -204,7 +203,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();

View File

@ -48,6 +48,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {

View File

@ -31,6 +31,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -41,7 +42,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -110,7 +111,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleCreateClose() throws Exception {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
@ -142,7 +143,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@ -158,7 +159,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
@ -185,7 +186,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@ -202,7 +203,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();