druid/docs/ingestion/kafka-ingestion.md

21 KiB

id title sidebar_label description
kafka-ingestion Apache Kafka ingestion Apache Kafka ingestion Overview of the Kafka indexing service for Druid. Includes example supervisor specs to help you get started.

:::info To use the Kafka indexing service, you must be on Apache Kafka version 0.11.x or higher. If you are using an older version, refer to the Apache Kafka upgrade guide. :::

When you enable the Kafka indexing service, you can configure supervisors on the Overlord to manage the creation and lifetime of Kafka indexing tasks. Kafka indexing tasks read events using Kafka partition and offset mechanism to guarantee exactly-once ingestion. The supervisor oversees the state of the indexing tasks to coordinate handoffs, manage failures, and ensure that scalability and replication requirements are maintained.

This topic contains configuration information for the Kafka indexing service supervisor for Apache Druid.

Setup

To use the Kafka indexing service, you must first load the druid-kafka-indexing-service extension on both the Overlord and the Middle Manager. See Loading extensions for more information.

Supervisor spec configuration

This section outlines the configuration properties that are specific to the Apache Kafka streaming ingestion method. For configuration properties shared across all streaming ingestion methods supported by Druid, see Supervisor spec.

The following example shows a supervisor spec for the Kafka indexing service:

Click to view the example
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "metrics-kafka",
      "timestampSpec": {
        "column": "timestamp",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [],
        "dimensionExclusions": [
          "timestamp",
          "value"
        ]
      },
      "metricsSpec": [
        {
          "name": "count",
          "type": "count"
        },
        {
          "name": "value_sum",
          "fieldName": "value",
          "type": "doubleSum"
        },
        {
          "name": "value_min",
          "fieldName": "value",
          "type": "doubleMin"
        },
        {
          "name": "value_max",
          "fieldName": "value",
          "type": "doubleMax"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "NONE"
      }
    },
    "ioConfig": {
      "topic": "metrics",
      "inputFormat": {
        "type": "json"
      },
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H"
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsPerSegment": 5000000
    }
  }
}

I/O configuration

The following table outlines the ioConfig configuration properties specific to Kafka. For configuration properties shared across all streaming ingestion methods, refer to Supervisor I/O configuration.

Property Type Description Required Default
topic String The Kafka topic to read from. To ingest data from multiple topic, use topicPattern. Yes if topicPattern isn't set.
topicPattern String Multiple Kafka topics to read from, passed as a regex pattern. See Ingest from multiple topics for more information. Yes if topic isn't set.
consumerProperties String, Object A map of properties to pass to the Kafka consumer. See Consumer properties for details. Yes. At the minimum, you must set the bootstrap.servers property to establish the initial connection to the Kafka cluster.
pollTimeout Long The length of time to wait for the Kafka consumer to poll records, in milliseconds. No 100
useEarliestOffset Boolean If a supervisor is managing a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether the supervisor retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run. No false
idleConfig Object Defines how and when the Kafka supervisor can become idle. See Idle configuration for more details. No null

Ingest from multiple topics

:::info If you enable multi-topic ingestion for a datasource, downgrading to a version older than 28.0.0 will cause the ingestion for that datasource to fail. :::

You can ingest data from one or multiple topics. When ingesting data from multiple topics, Druid assigns partitions based on the hashcode of the topic name and the ID of the partition within that topic. The partition assignment might not be uniform across all the tasks. Druid assumes that partitions across individual topics have similar load. If you want to ingest from both high and low load topics in the same supervisor, it is recommended that you have a higher number of partitions for a high load topic and a lower number of partitions for a low load topic.

To ingest data from multiple topics, use the topicPattern property instead of topic. You pass multiple topics as a regex pattern. For example, to ingest data from clicks and impressions, set topicPattern to clicks|impressions. Similarly, you can use metrics-.* as the value for topicPattern if you want to ingest from all the topics that start with metrics-. If you add a new topic that matches the regex to the cluster, Druid automatically starts ingesting from the new topic. Topic names that match partially, such as my-metrics-12, are not included for ingestion.

Consumer properties

Consumer properties control how a supervisor reads and processes event messages from a Kafka stream. For more information about consumer configuration and advanced use cases, refer to the Kafka documentation.

You must include bootstrap.servers in consumer properties with a list of Kafka brokers in the format <BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,.... In some cases, you may need to retrieve consumer properties at runtime. For example, when bootstrap.servers is unknown or not static.

The isolation.level property in consumerProperties determines how Druid reads messages written transactionally. If you use older versions of Kafka servers without transaction support or you don't want Druid to consume only committed transactions, set isolation.level to read_uncommitted. With read_uncommitted, which is the default setting, Druid reads all messages, including aborted transactional messages. Make sure offsets are sequential, since there is no offset gap check in Druid. For Druid to consume only committed transactional messages, set isolation.level to read_committed.

If your Kafka cluster enables consumer group ACLs, you can set group.id in consumerProperties to override the default auto generated group ID.

To enable SSL connections, you must provide passwords for keystore, truststore, and key confidentially. You can specify these settings in the jaas.conf login configuration file or in consumerProperties with sasl.jaas.config. To protect sensitive information, use the environment variable dynamic config provider to store credentials in system environment variables instead of plain text. Although you can also use the password provider interface to specify SSL configuration for Kafka ingestion, consider using the dynamic config provider as this feature is deprecated.

For example, when using SASL and SSL with Kafka, set the following environment variables for the Druid user on machines running the Overlord and Peon services. Replace the values to match your environment configurations.

export KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='accesskey' password='secret key';"
export SSL_KEY_PASSWORD=mysecretkeypassword
export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword
export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword

When you define the consumer properties in the supervisor spec, use the dynamic config provider to refer to the environment variables:

"consumerProperties": {
  "bootstrap.servers": "localhost:9092",
  "security.protocol": "SASL_SSL", 
  "sasl.mechanism": "PLAIN", 
  "ssl.keystore.location": "/opt/kafka/config/kafka01.keystore.jks",
  "ssl.truststore.location": "/opt/kafka/config/kafka.truststore.jks",
  "druid.dynamic.config.provider": {
    "type": "environment",
    "variables": {
      "sasl.jaas.config": "KAFKA_JAAS_CONFIG",
      "ssl.key.password": "SSL_KEY_PASSWORD",
      "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
      "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD"
    }
  }
}

When connecting to Kafka, Druid replaces the environment variables with their corresponding values.

Idle configuration

:::info Idle state transitioning is currently designated as experimental. :::

When the supervisor enters the idle state, no new tasks are launched subsequent to the completion of the currently executing tasks. This strategy may lead to reduced costs for cluster operators while using topics that get sporadic data.

The following table outlines the configuration options for idleConfig:

Property Description Required
enabled If true, the supervisor becomes idle if there is no data on input stream or topic for some time. No
inactiveAfterMillis The supervisor becomes idle if all existing data has been read from input topic and no new data has been published for inactiveAfterMillis milliseconds. No

The following example shows a supervisor spec with idle configuration enabled:

Click to view the example
{
  "type": "kafka",
  "spec": {
    "dataSchema": {...},
    "ioConfig": {
      "topic": "metrics",
      "inputFormat": {
        "type": "json"
      },
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "autoScalerConfig": {
        "enableTaskAutoScaler": true,
        "taskCountMax": 6,
        "taskCountMin": 2,
        "minTriggerScaleActionFrequencyMillis": 600000,
        "autoScalerStrategy": "lagBased",
        "lagCollectionIntervalMillis": 30000,
        "lagCollectionRangeMillis": 600000,
        "scaleOutThreshold": 6000000,
        "triggerScaleOutFractionThreshold": 0.3,
        "scaleInThreshold": 1000000,
        "triggerScaleInFractionThreshold": 0.9,
        "scaleActionStartDelayMillis": 300000,
        "scaleActionPeriodMillis": 60000,
        "scaleInStep": 1,
        "scaleOutStep": 2
      },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H",
      "idleConfig": {
        "enabled": true,
        "inactiveAfterMillis": 600000
      }
    },
    "tuningConfig": {...}
  }
}

Data format

The Kafka indexing service supports both inputFormat and parser to specify the data format. Use the inputFormat to specify the data format for the Kafka indexing service unless you need a format only supported by the legacy parser. For more information, see Source input formats.

The Kinesis indexing service supports the following values for inputFormat:

  • csv
  • tvs
  • json
  • kafka
  • avro_stream
  • protobuf

You can use parser to read thrift formats.

Kafka input format supervisor spec example

The kafka input format lets you parse the Kafka metadata fields in addition to the Kafka payload value contents.

The kafka input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available input format.

For example, consider the following structure for a Kafka message that represents a wiki edit in a development environment:

  • Kafka timestamp: 1680795276351
  • Kafka topic: wiki-edits
  • Kafka headers:
    • env=development
    • zone=z1
  • Kafka key: wiki-edit
  • Kafka payload value: {"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}

Using { "type": "json" } as the input format only parses the payload value. To parse the Kafka metadata in addition to the payload, use the kafka input format.

You configure it as follows:

  • valueFormat: Define how to parse the payload value. Set this to the payload parsing input format ({ "type": "json" }).
  • timestampColumnName: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is kafka.timestamp.
  • topicColumnName: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is kafka.topic. This field is useful when ingesting data from multiple topics into the same datasource.
  • headerFormat: The default value string decodes strings in UTF-8 encoding from the Kafka header.
    Other supported encoding formats include the following:
    • ISO-8859-1: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
    • US-ASCII: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin block of the Unicode character set.
    • UTF-16: Sixteen-bit UCS Transformation Format, byte order identified by an optional byte-order mark.
    • UTF-16BE: Sixteen-bit UCS Transformation Format, big-endian byte order.
    • UTF-16LE: Sixteen-bit UCS Transformation Format, little-endian byte order.
  • headerColumnPrefix: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default is kafka.header.. Considering the header from the example, Druid maps the headers to the following columns: kafka.header.env, kafka.header.zone.
  • keyFormat: Supply an input format to parse the key. Only the first value is used. If, as in the example, your key values are simple strings, then you can use the tsv format to parse them.
    {
      "type": "tsv",
      "findColumnsFromHeader": false,
      "columns": ["x"]
    } 
    
    Note that for tsv,csv, and regex formats, you need to provide a columns array to make a valid input format. Only the first one is used, and its name will be ignored in favor of keyColumnName.
  • keyColumnName: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is kafka.key.

The following input format uses default values for timestampColumnName, topicColumnName, headerColumnPrefix, and keyColumnName:

{
  "type": "kafka",
  "valueFormat": {
    "type": "json"
  },
  "headerFormat": {
    "type": "string"
  },
  "keyFormat": {
    "type": "tsv",
    "findColumnsFromHeader": false,
    "columns": ["x"]
  }
}

It parses the example message as follows:

{
  "channel": "#sv.wikipedia",
  "timestamp": "2016-06-27T00:00:11.080Z",
  "page": "Salo Toraut",
  "delta": 31,
  "namespace": "Main",
  "kafka.timestamp": 1680795276351,
  "kafka.topic": "wiki-edits",
  "kafka.header.env": "development",
  "kafka.header.zone": "z1",
  "kafka.key": "wiki-edit"
}

Finally, add these Kafka metadata columns to the dimensionsSpec or set your dimensionsSpec to auto-detect columns.

The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions:

Click to view the example
{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "topic": "wiki-edits",
      "inputFormat": {
        "type": "kafka",
        "valueFormat": {
          "type": "json"
        },
        "headerFormat": {
          "type": "string"
        },
        "keyFormat": {
          "type": "tsv",
          "findColumnsFromHeader": false,
          "columns": ["x"]
        }
      },
      "useEarliestOffset": true
    },
    "dataSchema": {
      "dataSource": "wikiticker",
      "timestampSpec": {
        "column": "timestamp",
        "format": "posix"
      },
      "dimensionsSpec":  "dimensionsSpec": {
        "useSchemaDiscovery": true,
        "includeAllDimensions": true
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    },
    "tuningConfig": {
      "type": "kafka"
    }
  }
}

After Druid ingests the data, you can query the Kafka metadata columns as follows:

SELECT
  "kafka.header.env",
  "kafka.key",
  "kafka.timestamp",
  "kafka.topic"
FROM "wikiticker"

This query returns:

kafka.header.env kafka.key kafka.timestamp kafka.topic
development wiki-edit 1680795276351 wiki-edits

Tuning configuration

The following table outlines the tuningConfig configuration properties specific to Kafka. For configuration properties shared across all streaming ingestion methods, refer to Supervisor tuning configuration.

Property Type Description Required Default
numPersistThreads Integer The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in a larger number of incremental segments, causing significant CPU time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. In both of these scenarios, ingestion can stall or pause frequently, causing it to fall behind. You can use additional threads to parallelize the segment creation without blocking ingestion as long as there are sufficient CPU resources available. No 1

Deployment notes on Kafka partitions and Druid segments

Druid assigns Kafka partitions to each Kafka indexing task. A task writes the events it consumes from Kafka into a single segment for the segment granularity interval until it reaches one of the following limits: maxRowsPerSegment, maxTotalRows, or intermediateHandoffPeriod. At this point, the task creates a new partition for this segment granularity to contain subsequent events.

The Kafka indexing task also does incremental hand-offs. Therefore, segments become available as they are ready and you don't have to wait for all segments until the end of the task duration. When the task reaches one of maxRowsPerSegment, maxTotalRows, or intermediateHandoffPeriod, it hands off all the segments and creates a new set of segments for further events. This allows the task to run for longer durations without accumulating old segments locally on Middle Manager services.

The Kafka indexing service may still produce some small segments. For example, consider the following scenario:

  • Task duration is 4 hours.
  • Segment granularity is set to an HOUR.
  • The supervisor was started at 9:10. After 4 hours at 13:10, Druid starts a new set of tasks. The events for the interval 13:00 - 14:00 may be split across existing tasks and the new set of tasks which could result in small segments. To merge them together into new segments of an ideal size (in the range of ~500-700 MB per segment), you can schedule re-indexing tasks, optionally with a different segment granularity.

For information on how to optimize the segment size, see Segment size optimization.

Learn more

See the following topics for more information: