Apache Druid can ingest denormalized data in JSON, CSV, or a delimited form such as TSV, or any custom format. While most examples in the documentation use data in JSON format, it is not difficult to configure Druid to ingest any other delimited data.
All forms of Druid ingestion require some form of schema object. The format of the data to be ingested is specified using the `inputFormat` entry in your [`ioConfig`](ingestion-spec.md#ioconfig).
| featureSpec | JSON Object | [JSON parser features](https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features) supported by Jackson, a JSON processor for Java. The features control parsing of the input JSON data. To enable a feature, map the feature name to a Boolean value of "true". For example: `"featureSpec": {"ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true}` | no |
The following properties are specialized properties that only apply when the JSON `inputFormat` is used in streaming ingestion, and they are related to how parsing exceptions are handled. In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON event spans multiple lines). However, if a parsing exception occurs, all JSON events that are present in the same streaming record will be discarded.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| assumeNewlineDelimited | Boolean | If the input is known to be newline delimited JSON (each individual JSON event is contained in a single line, separated by newlines), setting this option to true allows for more flexible parsing exception handling. Only the lines with invalid JSON syntax will be discarded, while lines containing valid JSON events will still be ingested. | no (Default false) |
| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occurred. | no (Default false) |
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if `findColumnsFromHeader` is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that `skipHeaderRows` will be applied before finding column names from the header. For example, if you set `skipHeaderRows` to 2 and `findColumnsFromHeader` to true, the task will skip the first two lines and then extract column information from the third line. `columns` will be ignored if this is set to true. | no (default = false if `columns` is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first `skipHeaderRows` rows. | no (default = 0) |
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if `findColumnsFromHeader` is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that `skipHeaderRows` will be applied before finding column names from the header. For example, if you set `skipHeaderRows` to 2 and `findColumnsFromHeader` to true, the task will skip the first two lines and then extract column information from the third line. `columns` will be ignored if this is set to true. | no (default = false if `columns` is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first `skipHeaderRows` rows. | no (default = 0) |
Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.
> To upgrade from versions earlier than 0.15.0 to 0.15.0 or new, read [Migration from 'contrib' extension](../development/extensions-core/orc.md#migration-from-contrib-extension).
| flattenSpec | JSON Object | Specifies flattening configuration for nested ORC data. Only 'path' expressions are supported ('jq' and 'tree' are unavailable). See [`flattenSpec`](#flattenspec) for more info. | no |
| binaryAsString | Boolean | Specifies if the binary orc column which is not logically marked as a string should be treated as a UTF-8 encoded string. | no (default = false) |
| `type` | String | Set value to `parquet`. | yes |
| `flattenSpec` | JSON Object | Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Only 'path' expressions are supported ('jq' and 'tree' are unavailable). | no (default will auto-discover 'root' level properties) |
| `binaryAsString` | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
|type| String| Set value to `avro_stream`. | yes |
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Only 'path' expressions are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes |
| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
###### Inline Schema Based Avro Bytes Decoder
> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you
> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that
> allows the parser to identify the proper Avro schema for reading records.
This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below.
```
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
```
###### Multiple Inline Schemas Based Avro Bytes Decoder
Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below.
```
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "org.apache.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields": [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
```
Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
first 1 byte is version and must always be 1.
next 4 bytes are integer schema ID serialized using big-endian byte order.
This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo). You need an HTTP service like schema repo to hold the Avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`.
This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).
| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
The following errors when reading records will be considered parse exceptions, which can be limited and logged with ingestion task configurations such as `maxParseExceptions` and `maxSavedParseExceptions`:
|flattenSpec| JSON Object | Define a [`flattenSpec`](#flattenspec) to extract nested values from Avro records. Only 'path' expressions are supported ('jq' and 'tree' are unavailable). | no (default will auto-discover 'root' level properties) |
|schema| JSON Object | Define a reader schema to be used when parsing Avro records. This is useful when parsing multiple versions of Avro OCF file data. | no (default will decode using the writer schema contained in the OCF file) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
| `type` | String | Set value to `protobuf`. | yes |
| `flattenSpec` | JSON Object | Define a [`flattenSpec`](#flattenspec) to extract nested values from a Protobuf record. Note that only 'path' expression are supported ('jq' and 'tree' is unavailable). | no (default will auto-discover 'root' level properties) |
| `protoBytesDecoder` | JSON Object | Specifies how to decode bytes to Protobuf record. | yes |
`kafka` is a special input format that wraps a regular input format (which goes in `valueFormat`) and allows you
to parse the Kafka metadata (timestamp, headers, and key) that is part of Kafka messages.
It should only be used when ingesting from Apache Kafka.
Configure the Kafka `inputFormat` as follows:
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| `type` | String | Set value to `kafka`. | yes |
| `valueFormat` | [InputFormat](#input-format) | Any [InputFormat](#input-format) to parse the Kafka value payload. For details about specifying the input format, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | yes |
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
| `keyFormat` | [InputFormat](#input-format) | Any [input format](#input-format) to parse the Kafka key. It only processes the first entry of the `inputFormat` field. For details, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | no |
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
The Kafka input format augments the payload with information from the Kafka timestamp, headers, and key.
If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
This ensures that upgrading a Kafka ingestion to use the Kafka input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data.
Here is a minimal example that only augments the parsed payload with the Kafka timestamp column:
```
"ioConfig": {
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "json"
}
},
...
}
```
Here is a complete example:
```
"ioConfig": {
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "json"
}
"timestampColumnName": "kafka.timestamp",
"headerFormat": {
"type": "string",
"encoding": "UTF-8"
},
"headerColumnPrefix": "kafka.header.",
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
},
"keyColumnName": "kafka.key",
},
...
}
```
If you want to use `kafka.timestamp` as Druid's primary timestamp (`__time`), specify it as the value for `column` in the `timestampSpec`:
```json
"timestampSpec": {
"column": "kafka.timestamp",
"format": "millis"
}
```
Similarly, if you want to use a timestamp extracted from the Kafka header:
You can use the `flattenSpec` object to flatten nested data, as an alternative to the Druid [nested columns](../querying/nested-columns.md) feature, and for nested input formats unsupported by the feature. It is an object within the `inputFormat` object.
See [Nested columns](../querying/nested-columns.md) for information on ingesting and storing nested data in an Apache Druid column as a `COMPLEX<json>` data type.
| useFieldDiscovery | If true, interpret all root-level fields as available fields for usage by [`timestampSpec`](./ingestion-spec.md#timestampspec), [`transformSpec`](./ingestion-spec.md#transformspec), [`dimensionsSpec`](./ingestion-spec.md#dimensionsspec), and [`metricsSpec`](./ingestion-spec.md#metricsspec).<br/><br/>If false, only explicitly specified fields (see `fields`) will be available for use. | `true` |
| fields | Specifies the fields of interest and how they are accessed. See [Field flattening specifications](#field-flattening-specifications) for more detail. | `[]` |
After Druid reads the input data records, it applies the flattenSpec before applying any other specs such as [`timestampSpec`](./ingestion-spec.md#timestampspec), [`transformSpec`](./ingestion-spec.md#transformspec), [`dimensionsSpec`](./ingestion-spec.md#dimensionsspec), or [`metricsSpec`](./ingestion-spec.md#metricsspec). This makes it possible to extract timestamps from flattened data, for example, and to refer to flattened data in transformations, in your dimension list, and when generating metrics.
| type | Options are as follows:<br/><br/><ul><li>`root`, referring to a field at the root level of the record. Only really useful if `useFieldDiscovery` is false.</li><li>`path`, referring to a field using [JsonPath](https://github.com/jayway/JsonPath) notation. Supported by most data formats that offer nesting, including `avro`, `json`, `orc`, and `parquet`.</li><li>`jq`, referring to a field using [jackson-jq](https://github.com/eiiches/jackson-jq) notation. Only supported for the `json` format.</li><li>`tree`, referring to a nested field from the root level of the record. Useful and more efficient than `path` or `jq` if a simple hierarchical fetch is required. Only supported for the `json` format.</li></ul> | none (required) |
| name | Name of the field after flattening. This name can be referred to by the [`timestampSpec`](./ingestion-spec.md#timestampspec), [`transformSpec`](./ingestion-spec.md#transformspec), [`dimensionsSpec`](./ingestion-spec.md#dimensionsspec), and [`metricsSpec`](./ingestion-spec.md#metricsspec).| none (required) |
| expr | Expression for accessing the field while flattening. For type `path`, this should be [JsonPath](https://github.com/jayway/JsonPath). For type `jq`, this should be [jackson-jq](https://github.com/eiiches/jackson-jq) notation. For other types, this parameter is ignored. | none (required for types `path` and `jq`) |
| nodes | For `tree` only. Multiple-expression field for accessing the field while flattening, representing the hierarchy of field names to read. For other types, this parameter must not be provided. | none (required for type `tree`) |
- For convenience, when defining a root-level field, it is possible to define only the field name, as a string, instead of a JSON object. For example, `{"name": "baz", "type": "root"}` is equivalent to `"baz"`.
- Enabling `useFieldDiscovery` will only automatically detect "simple" fields at the root level that correspond to data types that Druid supports. This includes strings, numbers, and lists of strings or numbers. Other types will not be automatically detected, and must be specified explicitly in the `fields` list.
- Duplicate field `name`s are not allowed. An exception will be thrown.
- If `useFieldDiscovery` is enabled, any discovered field with the same name as one already defined in the `fields` list will be skipped, rather than added twice.
- jackson-jq supports a subset of the full [jq](https://stedolan.github.io/jq/) syntax. Please refer to the [jackson-jq documentation](https://github.com/eiiches/jackson-jq) for details.
- [JsonPath](https://github.com/jayway/JsonPath) supports a bunch of functions, but not all of these functions are supported by Druid now. Following matrix shows the current supported JsonPath functions and corresponding data formats. Please also note the output data type of these functions.
|parseSpec | JSON Object | Specifies the timestamp and dimensions of the data (`timeAndDims` and `orc` format) and a `flattenSpec` (`orc` format). | yes|
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data, and optionally, a flatten spec. Valid parseSpec formats are `timeAndDims` and `parquet`. | yes |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no(default = false) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no(default = false) |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be JSON. See [JSON ParseSpec](#json-parsespec) for more configuration options. Note that `timeAndDims``parseSpec` is no longer supported. | yes |
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
This Protobuf bytes decoder allows the user to provide the contents of a Protobuf descriptor file inline, encoded as a Base64 string, and then parse it to get schema used to decode the Protobuf record from bytes.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Set value to `inline`. | yes |
| descriptorString | String | A compiled Protobuf descriptor, encoded as a Base64 string. | yes |
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).
| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md). | no |
| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
> The _jsonLowercase_ parser is deprecated and may be removed in a future version of Druid.
This is a special variation of the JSON ParseSpec that lower cases all the column names in the incoming JSON data. This parseSpec is required if you are updating to Druid 0.7.x from Druid 0.6.x, are directly ingesting JSON with mixed case column names, do not have any ETL in place to lower case those column names, and would like to make queries that include the data you created using 0.6.x and 0.7.x.
Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.
"function" : "function(str) { var parts = str.split(\"-\"); return { one: parts[0], two: parts[1] } }"
}
```
Note with the JavaScript parser that data must be fully parsed and returned as a `{key:value}` format in the JS logic.
This means any flattening or parsing multi-dimensional values must be done here.
> JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.