Remove hard limitation that druid(after 0.15.0) only can consume Kafka version 0.11.x or better (#10551)

* remove build in kafka consumer config :

* modify druid docs of kafka indexing service

* yuezhang

* modify doc

* modify docs

* fix kafkaindexTaskTest.java

* revert uncessary change

* add more logs and modify docs

* revert jdk version

* modify docs

* modify-kafka-version v2

* modify docs

* modify docs

* modify docs

* modify docs

* modify docs

* done

* remove useless import

* change code and add UT

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010 2020-12-04 09:37:59 +08:00 committed by GitHub
parent ae6c43de71
commit 229b5f359f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 3 deletions

View File

@ -33,10 +33,12 @@ manage failures, and ensure that the scalability and replication requirements ar
This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see
[Including Extensions](../../development/extensions.md#loading-extensions)).
> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the
> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. It is the default behavior of Druid and make the
> Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or
> better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade)
> if you are using older version of Kafka brokers.
> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka.
> Make sure offsets are sequential, since there is no offset gap check in Druid anymore.
## Tutorial
@ -132,7 +134,7 @@ A sample supervisor spec is shown below:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|

View File

@ -38,7 +38,6 @@ public class KafkaConsumerConfigs
props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
props.put("isolation.level", "read_committed");
return props;
}

View File

@ -97,6 +97,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
configMapper,
ioConfig.getConsumerProperties()
);
props.putIfAbsent("isolation.level", "read_committed");
props.putAll(consumerConfigs);
return new KafkaConsumer<>(props);

View File

@ -252,6 +252,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
props.putIfAbsent("isolation.level", "read_committed");
props.putAll(consumerConfigs);
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();

View File

@ -2329,6 +2329,47 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
);
}
@Test(timeout = 60_000L)
public void testRunUnTransactionMode() throws Exception
{
Map<String, Object> configs = kafkaServer.consumerProperties();
configs.put("isolation.level", "read_uncommitted");
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)),
configs,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
// Insert 2 records initially
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 2)) {
kafkaProducer.send(record).get();
}
kafkaProducer.flush();
kafkaProducer.abortTransaction();
}
while (countEvents(task) != 2) {
Thread.sleep(25);
}
Assert.assertEquals(2, countEvents(task));
}
@Test(timeout = 60_000L)
public void testCanStartFromLaterThanEarliestOffset() throws Exception
{