Remove consumer.listTopics() method in case when too many topics in kafka causes the FullGC in Overlord (#6455)

* remove consumer.listTopics() method

* add consumerLock and exception handling for consumer.partitionFor() and remove some useless checks

* add check in case consumer.partitionsFor() returns null

* fix CI failure

* fix failed UT

* Revert "fix CI failure"

This reverts commit f839d09e1e.

* revert unless commit and re-commit the useful part to fix failed UT
This commit is contained in:
elloooooo 2018-10-23 01:46:31 +08:00 committed by Slim Bouguerra
parent e83cc22996
commit 1e82b6291e
2 changed files with 35 additions and 18 deletions

View File

@ -767,8 +767,8 @@ public class KafkaSupervisor implements Supervisor
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.entrySet()) {
.getPartitionOffsetMap()
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
? null
: currentMetadata.getKafkaPartitions()
@ -1034,13 +1034,13 @@ public class KafkaSupervisor implements Supervisor
private void updatePartitionDataFromKafka()
{
Map<String, List<PartitionInfo>> topics;
List<PartitionInfo> partitions;
try {
synchronized (consumerLock) {
topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers
partitions = consumer.partitionsFor(ioConfig.getTopic());
}
}
catch (Exception e) { // calls to the consumer throw NPEs when the broker doesn't respond
catch (Exception e) {
log.warn(
e,
"Unable to get partition data from Kafka for brokers [%s], are the brokers up?",
@ -1049,10 +1049,6 @@ public class KafkaSupervisor implements Supervisor
return;
}
List<PartitionInfo> partitions = topics.get(ioConfig.getTopic());
if (partitions == null) {
log.warn("No such topic [%s] found, list of discovered topics [%s]", ioConfig.getTopic(), topics.keySet());
}
int numPartitions = (partitions != null ? partitions.size() : 0);
log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, ioConfig.getTopic());
@ -1101,7 +1097,7 @@ public class KafkaSupervisor implements Supervisor
taskCount++;
final KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
final String taskId = task.getId();
// Determine which task group this task belongs to based on one of the partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read
@ -2263,16 +2259,17 @@ public class KafkaSupervisor implements Supervisor
private void updateLatestOffsetsFromKafka()
{
synchronized (consumerLock) {
final Map<String, List<PartitionInfo>> topics = consumer.listTopics();
final List<PartitionInfo> partitionInfoList = consumer.partitionsFor(ioConfig.getTopic());
if (topics == null || !topics.containsKey(ioConfig.getTopic())) {
if (partitionInfoList == null || partitionInfoList.size() == 0) {
throw new ISE("Could not retrieve partitions for topic [%s]", ioConfig.getTopic());
}
final Set<TopicPartition> topicPartitions = topics.get(ioConfig.getTopic())
.stream()
.map(x -> new TopicPartition(x.topic(), x.partition()))
.collect(Collectors.toSet());
final Set<TopicPartition> topicPartitions = partitionInfoList
.stream()
.map(x -> new TopicPartition(x.topic(), x.partition()))
.collect(Collectors.toSet());
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);

View File

@ -26,6 +26,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -75,6 +78,7 @@ import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.JaasUtils;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
@ -101,6 +105,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -133,6 +138,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
private static ZkUtils zkUtils;
private final int numThreads;
@ -174,12 +180,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkServer.getConnectString(),
null,
1,
ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS))
ImmutableMap.of(
"num.partitions",
String.valueOf(NUM_PARTITIONS),
"auto.create.topics.enable",
String.valueOf(false)
)
);
kafkaServer.start();
kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000, JaasUtils.isZkSecurityEnabled());
}
@Before
@ -238,6 +251,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkServer.stop();
zkServer = null;
zkUtils.close();
zkUtils = null;
}
@Test
@ -2200,7 +2216,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Thread.sleep(100);
}
Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find"));
Assert.assertTrue(serviceEmitter.getStackTrace()
.startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find"));
Assert.assertEquals(
"WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
serviceEmitter.getExceptionMessage()
@ -2532,6 +2549,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
//create topic manually
AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {