make kafka poll timeout can be configured (#6773)

* make kafka poll timeout can be configured

* add doc

* rename DEFAULT_POLL_TIMEOUT to DEFAULT_POLL_TIMEOUT_MILLIS
This commit is contained in:
Mingming Qiu 2019-01-03 12:16:02 +08:00 committed by Benedict Jin
parent e8ddd9942d
commit 6761663509
10 changed files with 62 additions and 4 deletions

View File

@ -193,6 +193,7 @@ For Roaring bitmaps:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.html) or String password.|yes|
|`pollTimeout`|Long|The length of time to wait for the kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against node failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)|

View File

@ -100,7 +100,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
List<OrderedPartitionableRecord<Integer, Long>> records = new ArrayList<>();
try {
records = recordSupplier.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());

View File

@ -41,13 +41,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
{
private static final String TYPE = "index_kafka";
static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
private final KafkaIndexTaskIOConfig ioConfig;
private final ObjectMapper configMapper;

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.joda.time.DateTime;
@ -32,6 +33,7 @@ import java.util.Map;
public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Integer, Long>
{
private final Map<String, Object> consumerProperties;
private final long pollTimeout;
@JsonCreator
public KafkaIndexTaskIOConfig(
@ -40,6 +42,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
@JsonProperty("startPartitions") SeekableStreamPartitions<Integer, Long> startPartitions,
@JsonProperty("endPartitions") SeekableStreamPartitions<Integer, Long> endPartitions,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@ -59,6 +62,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
for (int partition : endPartitions.getPartitionSequenceNumberMap().keySet()) {
Preconditions.checkArgument(
@ -77,6 +81,12 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
return consumerProperties;
}
@JsonProperty
public long getPollTimeout()
{
return pollTimeout;
}
@Override
public String toString()
{
@ -86,6 +96,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
", startPartitions=" + getStartPartitions() +
", endPartitions=" + getEndPartitions() +
", consumerProperties=" + consumerProperties +
", pollTimeout=" + pollTimeout +
", useTransaction=" + isUseTransaction() +
", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() +

View File

@ -393,7 +393,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
records = consumer.poll(task.getIOConfig().getPollTimeout());
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());

View File

@ -209,6 +209,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), startPartitions),
new SeekableStreamPartitions<>(kafkaIoConfig.getTopic(), endPartitions),
kafkaIoConfig.getConsumerProperties(),
kafkaIoConfig.getPollTimeout(),
true,
minimumMessageTime,
maximumMessageTime,

View File

@ -34,8 +34,10 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password";
public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
public static final String KEY_PASSWORD_KEY = "ssl.key.password";
public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100;
private final Map<String, Object> consumerProperties;
private final long pollTimeout;
private final boolean skipOffsetGaps;
@JsonCreator
@ -45,6 +47,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@ -72,6 +75,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
);
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
}
@ -87,6 +91,12 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
return consumerProperties;
}
@JsonProperty
public long getPollTimeout()
{
return pollTimeout;
}
@JsonProperty
public boolean isUseEarliestOffset()
{
@ -108,6 +118,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
", useEarliestOffset=" + isUseEarliestOffset() +

View File

@ -69,6 +69,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -386,6 +387,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -428,6 +430,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -536,6 +539,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -663,6 +667,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -803,6 +808,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -905,6 +911,7 @@ public class KafkaIndexTaskTest
startPartitions,
endPartitions,
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -941,6 +948,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
DateTimes.of("2010"),
null,
@ -995,6 +1003,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
DateTimes.of("2010"),
@ -1059,6 +1068,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1119,6 +1129,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1160,6 +1171,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1212,6 +1224,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1267,6 +1280,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1311,6 +1325,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1393,6 +1408,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1453,6 +1469,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1467,6 +1484,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1521,6 +1539,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1535,6 +1554,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1590,6 +1610,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
false,
null,
null,
@ -1604,6 +1625,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 3L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
false,
null,
null,
@ -1664,6 +1686,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L, 1, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1729,6 +1752,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1743,6 +1767,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1799,6 +1824,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1836,6 +1862,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1888,6 +1915,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -1972,6 +2000,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -2011,6 +2040,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
@ -2065,6 +2095,7 @@ public class KafkaIndexTaskTest
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,

View File

@ -71,6 +71,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(100, config.getPollTimeout());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
Assert.assertEquals(false, config.isUseEarliestOffset());
@ -90,6 +91,7 @@ public class KafkaSupervisorIOConfigTest
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"pollTimeout\": 1000,\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
@ -113,6 +115,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(9, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(1000, config.getPollTimeout());
Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
Assert.assertEquals(true, config.isUseEarliestOffset());

View File

@ -2793,6 +2793,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
useEarliestOffset,
@ -2904,6 +2905,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
startPartitions,
endPartitions,
ImmutableMap.of("bootstrap.servers", kafkaHost),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
minimumMessageTime,
maximumMessageTime,