diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index b7845cae220..57d6feb27c9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -767,8 +767,8 @@ public class KafkaSupervisor implements Supervisor // as well as the case where the metadata store do not have an entry for the reset partitions boolean doReset = false; for (Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() - .getPartitionOffsetMap() - .entrySet()) { + .getPartitionOffsetMap() + .entrySet()) { final Long partitionOffsetInMetadataStore = currentMetadata == null ? null : currentMetadata.getKafkaPartitions() @@ -1034,13 +1034,13 @@ public class KafkaSupervisor implements Supervisor private void updatePartitionDataFromKafka() { - Map> topics; + List partitions; try { synchronized (consumerLock) { - topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers + partitions = consumer.partitionsFor(ioConfig.getTopic()); } } - catch (Exception e) { // calls to the consumer throw NPEs when the broker doesn't respond + catch (Exception e) { log.warn( e, "Unable to get partition data from Kafka for brokers [%s], are the brokers up?", @@ -1049,10 +1049,6 @@ public class KafkaSupervisor implements Supervisor return; } - List partitions = topics.get(ioConfig.getTopic()); - if (partitions == null) { - log.warn("No such topic [%s] found, list of discovered topics [%s]", ioConfig.getTopic(), topics.keySet()); - } int numPartitions = (partitions != null ? partitions.size() : 0); log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, ioConfig.getTopic()); @@ -1101,7 +1097,7 @@ public class KafkaSupervisor implements Supervisor taskCount++; final KafkaIndexTask kafkaTask = (KafkaIndexTask) task; final String taskId = task.getId(); - + // Determine which task group this task belongs to based on one of the partitions handled by this task. If we // later determine that this task is actively reading, we will make sure that it matches our current partition // allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read @@ -2263,16 +2259,17 @@ public class KafkaSupervisor implements Supervisor private void updateLatestOffsetsFromKafka() { synchronized (consumerLock) { - final Map> topics = consumer.listTopics(); + final List partitionInfoList = consumer.partitionsFor(ioConfig.getTopic()); - if (topics == null || !topics.containsKey(ioConfig.getTopic())) { + if (partitionInfoList == null || partitionInfoList.size() == 0) { throw new ISE("Could not retrieve partitions for topic [%s]", ioConfig.getTopic()); } - final Set topicPartitions = topics.get(ioConfig.getTopic()) - .stream() - .map(x -> new TopicPartition(x.topic(), x.partition())) - .collect(Collectors.toSet()); + final Set topicPartitions = partitionInfoList + .stream() + .map(x -> new TopicPartition(x.topic(), x.partition())) + .collect(Collectors.toSet()); + consumer.assign(topicPartitions); consumer.seekToEnd(topicPartitions); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c4e24f185cc..084696d7ab9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -26,6 +26,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZkUtils; import org.apache.curator.test.TestingCluster; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -75,6 +78,7 @@ import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.security.JaasUtils; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; @@ -101,6 +105,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -133,6 +138,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private static String kafkaHost; private static DataSchema dataSchema; private static int topicPostfix; + private static ZkUtils zkUtils; private final int numThreads; @@ -174,12 +180,19 @@ public class KafkaSupervisorTest extends EasyMockSupport zkServer.getConnectString(), null, 1, - ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS)) + ImmutableMap.of( + "num.partitions", + String.valueOf(NUM_PARTITIONS), + "auto.create.topics.enable", + String.valueOf(false) + ) ); kafkaServer.start(); kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort()); dataSchema = getDataSchema(DATASOURCE); + + zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000, JaasUtils.isZkSecurityEnabled()); } @Before @@ -238,6 +251,9 @@ public class KafkaSupervisorTest extends EasyMockSupport zkServer.stop(); zkServer = null; + + zkUtils.close(); + zkUtils = null; } @Test @@ -2200,7 +2216,8 @@ public class KafkaSupervisorTest extends EasyMockSupport Thread.sleep(100); } - Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find")); + Assert.assertTrue(serviceEmitter.getStackTrace() + .startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find")); Assert.assertEquals( "WTH?! cannot find taskGroup [0] among all taskGroups [{}]", serviceEmitter.getExceptionMessage() @@ -2532,6 +2549,9 @@ public class KafkaSupervisorTest extends EasyMockSupport private void addSomeEvents(int numEventsPerPartition) throws Exception { + //create topic manually + AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) {