mirror of https://github.com/apache/druid.git
document DynamicConfigProvider for kafka consumer properties (#10658)
* document DynamicConfigProvider for kafka consumer properties * Update docs/development/extensions-core/kafka-ingestion.md Co-authored-by: Jihoon Son <jihoonson@apache.org> * Update docs/development/extensions-core/kafka-ingestion.md * fix doc build Co-authored-by: Jihoon Son <jihoonson@apache.org>
This commit is contained in:
parent
abcf624a2e
commit
be019760bb
|
@ -134,7 +134,7 @@ A sample supervisor spec is shown below:
|
||||||
|-----|----|-----------|--------|
|
|-----|----|-----------|--------|
|
||||||
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|
||||||
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|
||||||
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. See [next section](#more-on-consumerproperties) for more information.|yes|
|
||||||
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|
||||||
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|
||||||
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
|
||||||
|
@ -147,6 +147,18 @@ A sample supervisor spec is shown below:
|
||||||
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|
||||||
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|
||||||
|
|
||||||
|
#### More on consumerProperties
|
||||||
|
|
||||||
|
This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.
|
||||||
|
By default, `isolation.level` is set to `read_committed`. It should be set to `read_uncommitted` if you don't want Druid to consume only committed transactions or working with older versions of Kafka servers with no Transactions support.
|
||||||
|
|
||||||
|
There are few cases that require fetching few/all of consumer properties at runtime e.g. when `bootstrap.servers` is not known upfront or not static, to enable SSL connections users might have to provide passwords for `keystore`, `truststore` and `key` secretly.
|
||||||
|
For such consumer properties, user can implement a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) to supply them at runtime, by adding
|
||||||
|
`druid.dynamic.config.provider`=`{"type": "<registered_dynamic_config_provider_name>", ...}`
|
||||||
|
in consumerProperties map.
|
||||||
|
|
||||||
|
Note: In 0.20.0 or older Druid versions, for SSL connections, the `keystore`, `truststore` and `key` passwords can also be provided as a [Password Provider](../../operations/password-provider.md). This is deprecated.
|
||||||
|
|
||||||
#### Specifying data format
|
#### Specifying data format
|
||||||
|
|
||||||
Kafka indexing service supports both [`inputFormat`](../../ingestion/data-formats.md#input-format) and [`parser`](../../ingestion/data-formats.md#parser) to specify the data format.
|
Kafka indexing service supports both [`inputFormat`](../../ingestion/data-formats.md#input-format) and [`parser`](../../ingestion/data-formats.md#parser) to specify the data format.
|
||||||
|
|
|
@ -46,6 +46,7 @@ Druid's extensions leverage Guice in order to add things at runtime. Basically,
|
||||||
1. Add new Jersey resources by calling `Jerseys.addResource(binder, clazz)`.
|
1. Add new Jersey resources by calling `Jerseys.addResource(binder, clazz)`.
|
||||||
1. Add new Jetty filters by extending `org.apache.druid.server.initialization.jetty.ServletFilterHolder`.
|
1. Add new Jetty filters by extending `org.apache.druid.server.initialization.jetty.ServletFilterHolder`.
|
||||||
1. Add new secret providers by extending `org.apache.druid.metadata.PasswordProvider`.
|
1. Add new secret providers by extending `org.apache.druid.metadata.PasswordProvider`.
|
||||||
|
1. Add new dynamic configuration providers by extending `org.apache.druid.metadata.DynamicConfigProvider`.
|
||||||
1. Add new ingest transform by implementing the `org.apache.druid.segment.transform.Transform` interface from the `druid-processing` package.
|
1. Add new ingest transform by implementing the `org.apache.druid.segment.transform.Transform` interface from the `druid-processing` package.
|
||||||
1. Bundle your extension with all the other Druid extensions
|
1. Bundle your extension with all the other Druid extensions
|
||||||
|
|
||||||
|
@ -240,6 +241,23 @@ In your implementation of `org.apache.druid.initialization.DruidModule`, `getJac
|
||||||
|
|
||||||
where `SomePasswordProvider` is the implementation of `PasswordProvider` interface, you can have a look at `org.apache.druid.metadata.EnvironmentVariablePasswordProvider` for example.
|
where `SomePasswordProvider` is the implementation of `PasswordProvider` interface, you can have a look at `org.apache.druid.metadata.EnvironmentVariablePasswordProvider` for example.
|
||||||
|
|
||||||
|
### Adding a new DynamicConfigProvider implementation
|
||||||
|
|
||||||
|
You will need to implement `org.apache.druid.metadata.DynamicConfigProvider` interface. For every place where Druid uses DynamicConfigProvider, a new instance of the implementation will be created,
|
||||||
|
thus make sure all the necessary information required for fetching all information is supplied during object instantiation.
|
||||||
|
In your implementation of `org.apache.druid.initialization.DruidModule`, `getJacksonModules` should look something like this -
|
||||||
|
|
||||||
|
``` java
|
||||||
|
return ImmutableList.of(
|
||||||
|
new SimpleModule("SomeDynamicConfigProviderModule")
|
||||||
|
.registerSubtypes(
|
||||||
|
new NamedType(SomeDynamicConfigProvider.class, "some")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
where `SomeDynamicConfigProvider` is the implementation of `DynamicConfigProvider` interface, you can have a look at `org.apache.druid.metadata.MapStringDynamicConfigProvider` for example.
|
||||||
|
|
||||||
### Adding a Transform Extension
|
### Adding a Transform Extension
|
||||||
|
|
||||||
To create a transform extension implement the `org.apache.druid.segment.transform.Transform` interface. You'll need to install the `druid-processing` package to import `org.apache.druid.segment.transform`.
|
To create a transform extension implement the `org.apache.druid.segment.transform.Transform` interface. You'll need to install the `druid-processing` package to import `org.apache.druid.segment.transform`.
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
---
|
||||||
|
id: dynamic-config-provider
|
||||||
|
title: "Dynamic Config Providers"
|
||||||
|
---
|
||||||
|
|
||||||
|
<!--
|
||||||
|
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
~ or more contributor license agreements. See the NOTICE file
|
||||||
|
~ distributed with this work for additional information
|
||||||
|
~ regarding copyright ownership. The ASF licenses this file
|
||||||
|
~ to you under the Apache License, Version 2.0 (the
|
||||||
|
~ "License"); you may not use this file except in compliance
|
||||||
|
~ with the License. You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing,
|
||||||
|
~ software distributed under the License is distributed on an
|
||||||
|
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
~ KIND, either express or implied. See the License for the
|
||||||
|
~ specific language governing permissions and limitations
|
||||||
|
~ under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
Druid's core mechanism of supplying multiple related set of credentials/secrets/configurations via Druid extension mechanism. Currently, it is only supported for providing Kafka Consumer configuration in [Kafka Ingestion](../development/extensions-core/kafka-ingestion.md).
|
||||||
|
|
||||||
|
Eventually this will replace [PasswordProvider](./password-provider.md)
|
||||||
|
|
||||||
|
|
||||||
|
Users can create custom extension of the `DynamicConfigProvider` interface that is registered at Druid process startup.
|
||||||
|
|
||||||
|
For more information, see [Adding a new DynamicConfigProvider implementation](../development/modules.md#adding-a-new-dynamicconfigprovider-implementation).
|
||||||
|
|
|
@ -63,6 +63,7 @@ Dropwizard
|
||||||
dropwizard
|
dropwizard
|
||||||
DruidInputSource
|
DruidInputSource
|
||||||
DruidSQL
|
DruidSQL
|
||||||
|
DynamicConfigProvider
|
||||||
EC2
|
EC2
|
||||||
EC2ContainerCredentialsProviderWrapper
|
EC2ContainerCredentialsProviderWrapper
|
||||||
ECS
|
ECS
|
||||||
|
@ -217,6 +218,7 @@ colocation
|
||||||
compactable
|
compactable
|
||||||
config
|
config
|
||||||
configs
|
configs
|
||||||
|
consumerProperties
|
||||||
cron
|
cron
|
||||||
csv
|
csv
|
||||||
customizable
|
customizable
|
||||||
|
|
Loading…
Reference in New Issue