druid/docs/content/ingestion/firehose.md

259 lines
11 KiB
Markdown
Raw Normal View History

---
layout: doc_page
title: "Apache Druid (incubating) Firehoses"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Apache Druid (incubating) Firehoses
Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html), stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html) ingestion model.
2018-09-04 15:54:41 -04:00
They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Specifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described below. | yes |
## Additional Firehoses
There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.
For additional firehoses, please see our [extensions list](../development/extensions.html).
2015-08-02 13:37:07 -04:00
2018-09-04 15:54:41 -04:00
### LocalFirehose
This Firehose can be used to read the data from files on local disk.
It can be used for POCs to ingest data on disk.
This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
Since each split represents a file in this firehose, each worker task of `index_parallel` will read a file.
A sample local firehose spec is shown below:
```json
{
"type" : "local",
"filter" : "*.csv",
"baseDir" : "/data/directory"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes|
|baseDir|directory to search recursively for files to be ingested. |yes|
2018-09-04 15:54:41 -04:00
### HttpFirehose
This Firehose can be used to read the data from remote sites via HTTP.
This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
Since each split represents a file in this firehose, each worker task of `index_parallel` will read a file.
A sample http firehose spec is shown below:
```json
{
"type" : "http",
"uris" : ["http://example.com/uri1", "http://example2.com/uri2"]
}
```
The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
|property|description|default|
|--------|-----------|-------|
|httpAuthenticationUsername|Username to use for authentication with specified URIs|None|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs|None|
Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": "password123"
}
```
You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": {
"type": "environment",
"variable": "HTTP_FIREHOSE_PW"
}
}
```
The below configurations can be optionally used for tuning the firehose performance.
|property|description|default|
|--------|-----------|-------|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
|prefetchTriggerBytes|Threshold to trigger prefetching http objects.|maxFetchCapacityBytes / 2|
|fetchTimeout|Timeout for fetching a http object.|60000|
|maxFetchRetry|Maximum retry for fetching a http object.|3|
2018-09-04 15:54:41 -04:00
### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments.
It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
A sample ingest firehose spec is shown below -
```json
{
"type" : "ingestSegment",
"dataSource" : "wikipedia",
"interval" : "2013-01-01/2013-01-02"
}
```
|property|description|required?|
|--------|-----------|---------|
2015-08-02 13:37:07 -04:00
|type|This should be "ingestSegment".|yes|
|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.html)|no|
|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
2018-09-04 15:54:41 -04:00
#### SqlFirehose
SqlFirehoseFactory can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes.
Requires one of the following extensions:
* [MySQL Metadata Store](../development/extensions-core/mysql.html).
* [PostgreSQL Metadata Store](../development/extensions-core/postgresql.html).
2018-09-04 15:54:41 -04:00
```json
{
"type" : "sql",
"database": {
"type": "mysql",
"connectorConfig" : {
"connectURI" : "jdbc:mysql://host:port/schema",
"user" : "user",
"password" : "password"
2018-09-04 15:54:41 -04:00
}
},
"sqls" : ["SELECT * FROM table1", "SELECT * FROM table2"]
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "sql".||Yes|
|database|Specifies the database connection details.||Yes|
2018-09-04 15:54:41 -04:00
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
|prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|
|fetchTimeout|Timeout for fetching the result set.|60000|No|
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|
#### Database
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes|
|connectorConfig|specify the database connection properties via `connectURI`, `user` and `password`||Yes|
2018-09-04 15:54:41 -04:00
### CombiningFirehose
2015-08-02 13:37:07 -04:00
This firehose can be used to combine and merge data from a list of different firehoses.
This can be used to merge data from more than one firehose.
```json
{
"type" : "combining",
"delegates" : [ { firehose1 }, { firehose2 }, ..... ]
}
```
|property|description|required?|
|--------|-----------|---------|
2015-08-02 13:37:07 -04:00
|type|This should be "combining"|yes|
|delegates|list of firehoses to combine data from|yes|
2018-09-04 15:54:41 -04:00
### Streaming Firehoses
The EventReceiverFirehose is used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). These firehoses are not suitable for batch ingestion.
2018-09-04 15:54:41 -04:00
#### EventReceiverFirehose
2015-08-02 13:37:07 -04:00
EventReceiverFirehoseFactory can be used to ingest events using an http endpoint.
```json
{
"type": "receiver",
"serviceName": "eventReceiverServiceName",
"bufferSize": 10000
}
```
2015-08-02 13:37:07 -04:00
When using this firehose, events can be sent by submitting a POST request to the http endpoint:
`http://<peonHost>:<port>/druid/worker/v1/chat/<eventReceiverServiceName>/push-events/`
|property|description|required?|
|--------|-----------|---------|
2015-08-02 13:37:07 -04:00
|type|This should be "receiver"|yes|
Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038) #### `EventReceiverFirehoseFactory` Fixed several concurrency bugs in `EventReceiverFirehoseFactory`: - Race condition over putting an entry into `producerSequences` in `checkProducerSequence()`. - `Stopwatch` used to measure time across threads, but it's a non-thread-safe class. - Use `System.nanoTime()` instead of `System.currentTimeMillis()` because the latter are [not suitable](https://stackoverflow.com/a/351571/648955) for measuring time intervals. - `close()` was not synchronized by could be called from multiple threads concurrently. Removed unnecessary `readLock` (protecting `hasMore()` and `nextRow()` which are always called from a single thread). Removed unnecessary `volatile` modifiers. Documented threading model and concurrent control flow of `EventReceiverFirehose` instances. **Important:** please read the updated Javadoc for `EventReceiverFirehose.addAll()`. It allows events from different requests (batches) to be interleaved in the buffer. Is this OK? #### `TimedShutoffFirehoseFactory` - Fixed a race condition that was possible because `close()` that was not properly synchronized. Documented threading model and concurrent control flow of `TimedShutoffFirehose` instances. #### `Firehose` Refined concurrency contract of `Firehose` based on `EventReceiverFirehose` implementation. Importantly, now it states that `close()` doesn't affect `hasMore()` and `nextRow()` and could be called concurrently with them. In other words, specified that `close()` is for "row supply" side rather than "row consume" side. However, I didn't check that other `Firehose` implementatations adhere to this contract. <hr> This issue is the result of reviewing `EventReceiverFirehose` and `TimedShutoffFirehose` using [this checklist](https://medium.com/@leventov/code-review-checklist-java-concurrency-49398c326154).
2019-03-04 16:50:03 -05:00
|serviceName|Name used to announce the event receiver service endpoint|yes|
|maxIdleTime|A firehose is automatically shut down after not receiving any events for this period of time, in milliseconds. If not specified, a firehose is never shut down due to being idle. Zero and negative values have the same effect.|no|
|bufferSize|Size of buffer used by firehose to store events|no, default is 100000|
Shut down time for EventReceiverFirehose can be specified by submitting a POST request to
`http://<peonHost>:<port>/druid/worker/v1/chat/<eventReceiverServiceName>/shutdown?shutoffTime=<shutoffTime>`
If shutOffTime is not specified, the firehose shuts off immediately.
#### TimedShutoffFirehose
This can be used to start a firehose that will shut down at a specified time.
An example is shown below:
```json
{
"type" : "timed",
"shutoffTime": "2015-08-25T01:26:05.119Z",
"delegate": {
"type": "receiver",
"serviceName": "eventReceiverServiceName",
"bufferSize": 100000
}
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "timed"|yes|
|shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes|
|delegate|firehose to use|yes|