remove deprecated standalone realtime node (#7915)

* remove CliRealtime, RealtimeManager, etc

* add redirects for deleted page to page that explains the deleted thing

* adjust docs
This commit is contained in:
Clint Wylie 2019-07-02 18:12:17 -07:00 committed by GitHub
parent 93b738bbfa
commit f7283378ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 56 additions and 2512 deletions

View File

@ -59,9 +59,9 @@
{"source": "Post-aggregations.html", "target": "querying/post-aggregations.html"}, {"source": "Post-aggregations.html", "target": "querying/post-aggregations.html"},
{"source": "Query-Context.html", "target": "querying/query-context.html"}, {"source": "Query-Context.html", "target": "querying/query-context.html"},
{"source": "Querying.html", "target": "querying/querying.html"}, {"source": "Querying.html", "target": "querying/querying.html"},
{"source": "Realtime-Config.html", "target": "configuration/realtime.html"}, {"source": "Realtime-Config.html", "target": "ingestion/standalone-realtime.html"},
{"source": "Realtime.html", "target": "ingestion/standalone-realtime.html"},
{"source": "Realtime-ingestion.html", "target": "ingestion/stream-ingestion.html"}, {"source": "Realtime-ingestion.html", "target": "ingestion/stream-ingestion.html"},
{"source": "Realtime.html", "target": "design/realtime.html"},
{"source": "Recommendations.html", "target": "operations/recommendations.html"}, {"source": "Recommendations.html", "target": "operations/recommendations.html"},
{"source": "Rolling-Updates.html", "target": "operations/rolling-updates.html"}, {"source": "Rolling-Updates.html", "target": "operations/rolling-updates.html"},
{"source": "Router.html", "target": "development/router.html"}, {"source": "Router.html", "target": "development/router.html"},
@ -167,4 +167,8 @@
{"source": "development/extensions-core/namespaced-lookup.html", "target": "lookups-cached-global.html"}, {"source": "development/extensions-core/namespaced-lookup.html", "target": "lookups-cached-global.html"},
{"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"}, {"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"},
{"source": "development/extensions-contrib/orc.html", "target": "../extensions-core/orc.html"} {"source": "development/extensions-contrib/orc.html", "target": "../extensions-core/orc.html"}
{"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"},
{"source": "configuration/realtime.md", "target": "../ingestion/standalone-realtime.html"},
{"source": "design/realtime.md", "target": "../ingestion/standalone-realtime.html"},
{"source": "ingestion/stream-pull.md", "target": "../ingestion/standalone-realtime.html"}
] ]

View File

@ -87,7 +87,6 @@ This page documents all of the configuration properties for each Druid service t
* [Segment Discovery](#segment-discovery) * [Segment Discovery](#segment-discovery)
* [Caching](#cache-configuration) * [Caching](#cache-configuration)
* [General Query Configuration](#general-query-configuration) * [General Query Configuration](#general-query-configuration)
* [Realtime processes (Deprecated)](#realtime-processes)
## Recommended Configuration File Organization ## Recommended Configuration File Organization
@ -493,7 +492,7 @@ To use graphite as emitter set `druid.emitter=graphite`. For configuration detai
### Metadata Storage ### Metadata Storage
These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html), [Overlord](../design/overlord.html) and [Realtime Processes](../design/realtime.html). These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html) and [Overlord](../design/overlord.html).
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
@ -1672,7 +1671,3 @@ Supported query contexts:
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None| |`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false| |`useOffheap`|Set to true to store aggregations off-heap when merging results.|false|
## Realtime processes
Configuration for the deprecated realtime process can be found [here](../configuration/realtime.html).

View File

@ -1,98 +0,0 @@
---
layout: doc_page
title: "Realtime Process Configuration"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Realtime Process Configuration
For general Apache Druid (incubating) Realtime Process information, see [here](../design/realtime.html).
Runtime Configuration
---------------------
The realtime process uses several of the global configs in [Configuration](../configuration/index.html) and has the following set of configurations as well:
### Process Config
|Property|Description|Default|
|--------|-----------|-------|
|`druid.host`|The host for the current process. This is used to advertise the current processes location as reachable from another process and should generally be specified such that `http://${druid.host}/` could actually talk to this process|InetAddress.getLocalHost().getCanonicalHostName()|
|`druid.plaintextPort`|This is the port to actually listen on; unless port mapping is used, this will be the same port as is on `druid.host`|8084|
|`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.html) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8284|
|`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|druid/realtime|
### Realtime Operation
|Property|Description|Default|
|--------|-----------|-------|
|`druid.publish.type`|Where to publish segments. Choices are "noop" or "metadata".|metadata|
|`druid.realtime.specFile`|File location of realtime specFile.|none|
### Storing Intermediate Segments
|Property|Description|Default|
|--------|-----------|-------|
|`druid.segmentCache.locations`|Where intermediate segments are stored. The maxSize should always be zero.|none|
### Query Configs
#### Processing
|Property|Description|Default|
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime processes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|auto (max 1GB)|
|`druid.processing.formatString`|Realtime and Historical processes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
line.
#### General Query Configuration
##### GroupBy Query Config
See [groupBy server configuration](../querying/groupbyquery.html#server-configuration).
##### Search Query Config
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.search.maxSearchLimit`|Maximum number of search results to return.|1000|
### Caching
You can optionally configure caching to be enabled on the realtime process by setting caching configs here.
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`|
|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
See [cache configuration](index.html#cache-configuration) for how to configure cache settings.

View File

@ -27,7 +27,7 @@ title: "ZooKeeper"
Apache Druid (incubating) uses [Apache ZooKeeper](http://zookeeper.apache.org/) (ZK) for management of current cluster state. The operations that happen over ZK are Apache Druid (incubating) uses [Apache ZooKeeper](http://zookeeper.apache.org/) (ZK) for management of current cluster state. The operations that happen over ZK are
1. [Coordinator](../design/coordinator.html) leader election 1. [Coordinator](../design/coordinator.html) leader election
2. Segment "publishing" protocol from [Historical](../design/historical.html) and [Realtime](../design/realtime.html) 2. Segment "publishing" protocol from [Historical](../design/historical.html)
3. Segment load/drop protocol between [Coordinator](../design/coordinator.html) and [Historical](../design/historical.html) 3. Segment load/drop protocol between [Coordinator](../design/coordinator.html) and [Historical](../design/historical.html)
4. [Overlord](../design/overlord.html) leader election 4. [Overlord](../design/overlord.html) leader election
5. [Overlord](../design/overlord.html) and [MiddleManager](../design/middlemanager.html) task management 5. [Overlord](../design/overlord.html) and [MiddleManager](../design/middlemanager.html) task management
@ -44,7 +44,7 @@ ${druid.zk.paths.coordinatorPath}/_COORDINATOR
The `announcementsPath` and `servedSegmentsPath` are used for this. The `announcementsPath` and `servedSegmentsPath` are used for this.
All [Historical](../design/historical.html) and [Realtime](../design/realtime.html) processes publish themselves on the `announcementsPath`, specifically, they will create an ephemeral znode at All [Historical](../design/historical.html) processes publish themselves on the `announcementsPath`, specifically, they will create an ephemeral znode at
``` ```
${druid.zk.paths.announcementsPath}/${druid.host} ${druid.zk.paths.announcementsPath}/${druid.host}

View File

@ -1,80 +0,0 @@
---
layout: doc_page
title: "Real-time Process"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Real-time Process
<div class="note info">
NOTE: Realtime processes are deprecated. Please use the <a href="../development/extensions-core/kafka-ingestion.html">Kafka Indexing Service</a> for stream pull use cases instead.
</div>
For Apache Druid (incubating) Real-time Process Configuration, see [Realtime Configuration](../configuration/realtime.html).
For Real-time Ingestion, see [Realtime Ingestion](../ingestion/stream-ingestion.html).
Realtime processes provide a realtime index. Data indexed via these processes is immediately available for querying. Realtime processes will periodically build segments representing the data theyve collected over some span of time and transfer these segments off to [Historical](../design/historical.html) processes. They use ZooKeeper to monitor the transfer and the metadata storage to store metadata about the transferred segment. Once transfered, segments are forgotten by the Realtime processes.
### Running
```
org.apache.druid.cli.Main server realtime
```
Segment Propagation
-------------------
The segment propagation diagram for real-time data ingestion can be seen below:
![Segment Propagation](../../img/segmentPropagation.png "Segment Propagation")
You can read about the various components shown in this diagram under the Architecture section (see the menu on the right). Note that some of the names are now outdated.
### Firehose
See [Firehose](../ingestion/firehose.html).
### Plumber
See [Plumber](../design/plumber.html)
Extending the code
------------------
Realtime integration is intended to be extended in two ways:
1. Connect to data streams from varied systems ([Firehose](https://github.com/apache/incubator-druid/blob/master/core/src/main/org/apache/druid/data/input/FirehoseFactory.java))
2. Adjust the publishing strategy to match your needs ([Plumber](https://github.com/apache/incubator-druid/blob/master/server/src/main/java/org/apache/druid/segment/realtime/plumber/PlumberSchool.java))
The expectations are that the former will be very common and something that users of Druid will do on a fairly regular basis. Most users will probably never have to deal with the latter form of customization. Indeed, we hope that all potential use cases can be packaged up as part of Druid proper without requiring proprietary customization.
Given those expectations, adding a firehose is straightforward and completely encapsulated inside of the interface. Adding a plumber is more involved and requires understanding of how the system works to get right, its not impossible, but its not intended that individuals new to Druid will be able to do it immediately.
HTTP Endpoints
--------------
The real-time process exposes several HTTP endpoints for interactions.
### GET
* `/status`
Returns the Druid version, loaded extensions, memory used, total memory and other useful information about the process.

View File

@ -54,8 +54,7 @@ Most of the coordination logic for (real-time) ingestion is in the Druid indexin
## Real-time Ingestion ## Real-time Ingestion
Druid loads data through `FirehoseFactory.java` classes. Firehoses often wrap other firehoses, where, similar to the design of the Druid loads data through `FirehoseFactory.java` classes. Firehoses often wrap other firehoses, where, similar to the design of the
query runners, each firehose adds a layer of logic. Much of the core management logic is in `RealtimeManager.java` and the query runners, each firehose adds a layer of logic, and the persist and hand-off logic is in `RealtimePlumber.java`.
persist and hand-off logic is in `RealtimePlumber.java`.
## Hadoop-based Batch Ingestion ## Hadoop-based Batch Ingestion

View File

@ -24,7 +24,7 @@ title: "Apache Druid (incubating) Firehoses"
# Apache Druid (incubating) Firehoses # Apache Druid (incubating) Firehoses
Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html), stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html), and the [stream-pull (deprecated)](../ingestion/stream-pull.html) ingestion model. Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html), stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html) ingestion model.
They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose. They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose.
@ -204,9 +204,7 @@ This can be used to merge data from more than one firehose.
### Streaming Firehoses ### Streaming Firehoses
The firehoses shown below should only be used with the [stream-pull (deprecated)](../ingestion/stream-pull.html) ingestion model, as they are not suitable for batch ingestion. The EventReceiverFirehose is used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). These firehoses are not suitable for batch ingestion.
The EventReceiverFirehose is also used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html).
#### EventReceiverFirehose #### EventReceiverFirehose

View File

@ -310,7 +310,6 @@ The IOConfig spec differs based on the ingestion task type.
* Hadoop Batch ingestion: See [Hadoop Batch IOConfig](../ingestion/hadoop.html#ioconfig) * Hadoop Batch ingestion: See [Hadoop Batch IOConfig](../ingestion/hadoop.html#ioconfig)
* Kafka Indexing Service: See [Kafka Supervisor IOConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorIOConfig) * Kafka Indexing Service: See [Kafka Supervisor IOConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorIOConfig)
* Stream Push Ingestion: Stream push ingestion with Tranquility does not require an IO Config. * Stream Push Ingestion: Stream push ingestion with Tranquility does not require an IO Config.
* Stream Pull Ingestion (Deprecated): See [Stream pull ingestion](../ingestion/stream-pull.html#ioconfig).
# Tuning Config # Tuning Config
@ -320,7 +319,6 @@ The TuningConfig spec differs based on the ingestion task type.
* Hadoop Batch ingestion: See [Hadoop Batch TuningConfig](../ingestion/hadoop.html#tuningconfig) * Hadoop Batch ingestion: See [Hadoop Batch TuningConfig](../ingestion/hadoop.html#tuningconfig)
* Kafka Indexing Service: See [Kafka Supervisor TuningConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorTuningConfig) * Kafka Indexing Service: See [Kafka Supervisor TuningConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorTuningConfig)
* Stream Push Ingestion (Tranquility): See [Tranquility TuningConfig](http://static.druid.io/tranquility/api/latest/#com.metamx.tranquility.druid.DruidTuning). * Stream Push Ingestion (Tranquility): See [Tranquility TuningConfig](http://static.druid.io/tranquility/api/latest/#com.metamx.tranquility.druid.DruidTuning).
* Stream Pull Ingestion (Deprecated): See [Stream pull ingestion](../ingestion/stream-pull.html#tuningconfig).
# Evaluating Timestamp, Dimensions and Metrics # Evaluating Timestamp, Dimensions and Metrics

View File

@ -0,0 +1,43 @@
---
layout: doc_page
title: "Realtime Process"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Realtime Process
Older versions of Apache Druid (incubating) supported a standalone 'Realtime' process to query and index 'stream pull'
modes of real-time ingestion. These processes would periodically build segments for the data they had collected over
some span of time and then set up hand-off to [Historical](../design/historical.html) servers.
This processes could be invoked by
```
org.apache.druid.cli.Main server realtime
```
This model of stream pull ingestion was deprecated for a number of both operational and architectural reasons, and
removed completely in Druid 0.16.0. Operationally, realtime nodes were difficult to configure, deploy, and scale because
each node required an unique configuration. The design of the stream pull ingestion system for realtime nodes also
suffered from limitations which made it not possible to achieve exactly once ingestion.
Please consider using the [Kafka Indexing Service](../development/extensions-core/kafka-ingestion.html) or
[Kinesis Indexing Service](../development/extensions-core/kinesis-ingestion.md) for stream pull ingestion instead.

View File

@ -1,376 +0,0 @@
---
layout: doc_page
title: "Stream Pull Ingestion"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<div class="note info">
NOTE: Realtime processes are deprecated. Please use the <a href="../development/extensions-core/kafka-ingestion.html">Kafka Indexing Service</a> for stream pull use cases instead.
</div>
# Stream Pull Ingestion
If you have an external service that you want to pull data from, you have two options. The simplest
option is to set up a "copying" service that reads from the data source and writes to Apache Druid (incubating) using
the [stream push method](stream-push.html).
Another option is *stream pull*. With this approach, a Druid Realtime Process ingests data from a
[Firehose](../ingestion/firehose.html) connected to the data you want to
read. The Druid quickstart and tutorials do not include information about how to set up standalone realtime processes, but
they can be used in place for Tranquility server and the indexing service. Please note that Realtime processes have different properties and roles than the indexing service.
## Realtime Process Ingestion
Much of the configuration governing Realtime processes and the ingestion of data is set in the Realtime spec file, discussed on this page.
For general Real-time Process information, see [here](../design/realtime.html).
For Real-time Process Configuration, see [Realtime Configuration](../configuration/realtime.html).
For writing your own plugins to the real-time process, see [Firehose](../ingestion/firehose.html).
## Realtime "specFile"
The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following:
```json
[
{
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 1000000,
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
}
]
```
This is a JSON Array so you can give more than one realtime stream to a given process. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks.
There are three parts to a realtime stream specification, `dataSchema`, `IOConfig`, and `tuningConfig` which we will go into here.
### DataSchema
This field is required.
See [Ingestion](../ingestion/index.html)
### IOConfig
This field is required.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|yes|
|firehose|JSON Object|Where the data is coming from. Described in detail below.|yes|
|plumber|JSON Object|Where the data is going. Described in detail below.|yes|
#### Firehose
See [Firehose](../ingestion/firehose.html) for more information on various firehoses.
#### Plumber
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|no|
### TuningConfig
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|no|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)|
|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size.|no (default == One-sixth of max JVM memory)|
|windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10M)|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10M)|
|basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)|
|versioningPolicy|Object|How to version segments.|no (default == based on segment start time)|
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default == 'serverTime')|
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime processes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec')|
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|no (default == false)|
|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|alertTimeout|long|Milliseconds timeout after which an alert is created if the task isn't finished by then. This allows users to monitor tasks that are failing to finish and give up the worker slot for any unexpected errors.|no (default == 0)|
|segmentWriteOutMediumFactory|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|dedupColumn|String|the column to judge whether this row is already in this segment, if so, throw away this row. If it is String type column, to reduce heap cost, use long type hashcode of this column's value to judge whether this row is already ingested, so there maybe very small chance to throw away a row that is not ingested before.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/apache/incubator-druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.
#### Rejection Policy
The following policies are available:
* `serverTime` &ndash; The recommended policy for "current time" data, it is optimal for current data that is generated and ingested in real time. Uses `windowPeriod` to accept only those events that are inside the window looking forward and back.
* `messageTime` &ndash; Can be used for non-"current time" as long as that data is relatively in sequence. Events are rejected if they are less than `windowPeriod` from the event with the latest timestamp. Hand off only occurs if an event is seen after the segmentGranularity and `windowPeriod` (hand off will not periodically occur unless you have a constant stream of data).
* `none` &ndash; All events are accepted. Never hands off data unless shutdown() is called on the configured firehose.
#### SegmentWriteOutMediumFactory
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../configuration/index.html#segmentwriteoutmediumfactory) for explanation and available options.|yes|
#### IndexSpec
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object; see below for options.|no (defaults to Concise)|
|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
|metricCompression|String|Compression format for metric columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
##### Bitmap types
For Concise bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Must be `concise`.|yes|
For Roaring bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Must be `roaring`.|yes|
|compressRunOnSerialization|Boolean|Use a run-length encoding where it is estimated as more space efficient.|no (default == `true`)|
#### Sharding
Druid uses shards, or segments with partition numbers, to more efficiently handle large amounts of incoming data. In Druid, shards represent the segments that together cover a time interval based on the value of `segmentGranularity`. If, for example, `segmentGranularity` is set to "hour", then a number of shards may be used to store the data for that hour. Sharding along dimensions may also occur to optimize efficiency.
Segments are identified by datasource, time interval, and version. With sharding, a segment is also identified by a partition number. Typically, each shard will have the same version but a different partition number to uniquely identify it.
In small-data scenarios, sharding is unnecessary and can be set to none (the default):
```json
"shardSpec": {"type": "none"}
```
However, in scenarios with multiple realtime processes, `none` is less useful as it cannot help with scaling data volume (see below). Note that for the batch indexing service, no explicit configuration is required; sharding is provided automatically.
Druid uses sharding based on the `shardSpec` setting you configure. The recommended choices, `linear` and `numbered`, are discussed below; other types have been useful for internal Druid development but are not appropriate for production setups.
Keep in mind, that sharding configuration has nothing to do with configured firehose. For example, if you set partition number to 0, it doesn't mean that Kafka firehose will consume only from 0 topic partition.
##### Linear
This strategy provides following advantages:
* There is no need to update the fileSpec configurations of existing processes when adding new processes.
* All unique shards are queried, regardless of whether the partition numbering is sequential or not (it allows querying of partitions 0 and 2, even if partition 1 is missing).
Configure `linear` under `schema`:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 0
}
```
##### Numbered
This strategy is similar to `linear` except that it does not tolerate non-sequential partition numbering (it will *not* allow querying of partitions 0 and 2 if partition 1 is missing). It also requires explicitly setting the total number of partitions.
Configure `numbered` under `schema`:
```json
"shardSpec": {
"type": "numbered",
"partitionNum": 0,
"partitions": 2
}
```
##### Scale and Redundancy
The `shardSpec` configuration can be used to create redundancy by having the same `partitionNum` values on different processes.
For example, if RealTimeProcess1 has:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 0
}
```
and RealTimeProcess2 has:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 0
}
```
then two realtime processes can store segments with the same datasource, version, time interval, and partition number. Brokers that query for data in such segments will assume that they hold the same data, and the query will target only one of the segments.
`shardSpec` can also help achieve scale. For this, add processes with a different `partionNum`. Continuing with the example, if RealTimeProcess3 has:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 1
}
```
then it can store segments with the same datasource, time interval, and version as in the first two processes, but with a different partition number. Brokers that query for data in such segments will assume that a segment from RealTimeProcess3 holds *different* data, and the query will target it along with a segment from the first two processes.
You can use type `numbered` similarly. Note that type `none` is essentially type `linear` with all shards having a fixed `partitionNum` of 0.
## Constraints
The following table summarizes constraints between settings in the spec file for the Realtime subsystem.
|Name|Effect|Minimum|Recommended|
|----|------|-------|-----------|
|windowPeriod| When reading a row, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
|segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity|
|queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month |
|intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set| number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
|maxBytesInMemory| The number of bytes to keep in memory before a flush to disk. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
The normal, expected use cases have the following overall constraints: `intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and `queryGranularity ≤ segmentGranularity`
## Limitations
### Kafka
Standalone realtime processes use the Kafka high level consumer, which imposes a few restrictions.
Druid replicates segment such that logically equivalent data segments are concurrently hosted on N processes. If N1 processes go down,
the data will still be available for querying. On real-time processes, this process depends on maintaining logically equivalent
data segments on each of the N processes, which is not possible with standard Kafka consumer groups if your Kafka topic requires more than one consumer
(because consumers in different consumer groups will split up the data differently).
For example, let's say your topic is split across Kafka partitions 1, 2, & 3 and you have 2 real-time processes with linear shard specs 1 & 2.
Both of the real-time processes are in the same consumer group. Real-time process 1 may consume data from partitions 1 & 3, and real-time process 2 may consume data from partition 2.
Querying for your data through the Broker will yield correct results.
The problem arises if you want to replicate your data by creating real-time processes 3 & 4. These new real-time processes also
have linear shard specs 1 & 2, and they will consume data from Kafka using a different consumer group. In this case,
real-time process 3 may consume data from partitions 1 & 2, and real-time process 4 may consume data from partition 2.
From Druid's perspective, the segments hosted by real-time processes 1 and 3 are the same, and the data hosted by real-time processes
2 and 4 are the same, although they are reading from different Kafka partitions. Querying for the data will yield inconsistent
results.
Is this always a problem? No. If your data is small enough to fit on a single Kafka partition, you can replicate without issues.
Otherwise, you can run real-time processes without replication.
Please note that druid will skip over event that failed its checksum and it is corrupt.
### Locking
Using stream pull ingestion with Realtime processes together batch ingestion may introduce data override issues. For example, if you
are generating hourly segments for the current day, and run a daily batch job for the current day's data, the segments created by
the batch job will have a more recent version than most of the segments generated by realtime ingestion. If your batch job is indexing
data that isn't yet complete for the day, the daily segment created by the batch job can override recent segments created by
realtime processes. A portion of data will appear to be lost in this case.
### Schema changes
Standalone realtime processes require stopping a process to update a schema, and starting it up again for the schema to take effect.
This can be difficult to manage at scale, especially with multiple partitions.
### Log management
Each standalone realtime process has its own set of logs. Diagnosing errors across many partitions across many servers may be
difficult to manage and track at scale.
## Deployment Notes
Stream ingestion may generate a large number of small segments because it's difficult to optimize the segment size at
ingestion time. The number of segments will increase over time, and this might cause the query performance issue.
Details on how to optimize the segment size can be found on [Segment size optimization](../operations/segment-optimization.html).

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.druid.segment.realtime.FireDepartment;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class FireDepartmentsProvider implements Provider<List<FireDepartment>>
{
private final List<FireDepartment> fireDepartments = new ArrayList<>();
@Inject
public FireDepartmentsProvider(
ObjectMapper jsonMapper,
RealtimeManagerConfig config
)
{
try {
this.fireDepartments.addAll(
jsonMapper.readValue(config.getSpecFile(), new TypeReference<List<FireDepartment>>() {})
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public List<FireDepartment> get()
{
return fireDepartments;
}
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.File;
/**
*/
public class RealtimeManagerConfig
{
@JsonProperty
private File specFile;
public File getSpecFile()
{
return specFile;
}
}

View File

@ -1,393 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseV2;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.segment.realtime.plumber.Plumber;
import org.apache.druid.segment.realtime.plumber.Plumbers;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class RealtimeManager implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(RealtimeManager.class);
private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentServerAnnouncer serverAnnouncer;
/**
* key=data source name,value=mappings of partition number to FireChief
*/
private final Map<String, Map<Integer, FireChief>> chiefs;
private ExecutorService fireChiefExecutor;
private boolean stopping;
@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentServerAnnouncer serverAnnouncer
)
{
this(fireDepartments, conglomerate, serverAnnouncer, new HashMap<>());
}
@VisibleForTesting
RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentServerAnnouncer serverAnnouncer,
Map<String, Map<Integer, FireChief>> chiefs
)
{
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.serverAnnouncer = serverAnnouncer;
this.chiefs = chiefs == null ? new HashMap<>() : new HashMap<>(chiefs);
}
@VisibleForTesting
Map<Integer, FireChief> getFireChiefs(String dataSource)
{
return chiefs.get(dataSource);
}
@LifecycleStart
public void start()
{
serverAnnouncer.announce();
fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), "chief-%d");
for (final FireDepartment fireDepartment : fireDepartments) {
final DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment, conglomerate);
chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>())
.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
fireChiefExecutor.submit(chief);
}
}
@LifecycleStop
public void stop()
{
stopping = true;
try {
if (fireChiefExecutor != null) {
fireChiefExecutor.shutdownNow();
Preconditions.checkState(
fireChiefExecutor.awaitTermination(10, TimeUnit.SECONDS),
"persistExecutor not terminated"
);
}
}
catch (InterruptedException e) {
throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()");
}
serverAnnouncer.unannounce();
}
public FireDepartmentMetrics getMetrics(String datasource)
{
Map<Integer, FireChief> chiefs = this.chiefs.get(datasource);
if (chiefs == null) {
return null;
}
FireDepartmentMetrics snapshot = null;
for (FireChief chief : chiefs.values()) {
if (snapshot == null) {
snapshot = chief.getMetrics().snapshot();
} else {
snapshot.merge(chief.getMetrics());
}
}
return snapshot;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final Map<Integer, FireChief> partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource()
.getNames()));
return partitionChiefs == null ? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
Iterables.transform(
partitionChiefs.values(), new Function<FireChief, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireChief fireChief)
{
return fireChief.getQueryRunner(query);
}
}
)
)
);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final Map<Integer, FireChief> partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource()
.getNames()));
return partitionChiefs == null
? new NoopQueryRunner<T>()
: factory.getToolchest().mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
Iterables.transform(
specs,
new Function<SegmentDescriptor, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(SegmentDescriptor spec)
{
final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber());
return retVal == null
? new NoopQueryRunner<T>()
: retVal.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(spec)));
}
}
)
)
);
}
class FireChief implements Runnable
{
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private final RealtimeTuningConfig config;
private final QueryRunnerFactoryConglomerate conglomerate;
private Plumber plumber;
FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate)
{
this.fireDepartment = fireDepartment;
this.conglomerate = conglomerate;
this.config = fireDepartment.getTuningConfig();
this.metrics = fireDepartment.getMetrics();
}
private Firehose initFirehose()
{
try {
log.info("Calling the FireDepartment and getting a Firehose.");
return fireDepartment.connect();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private FirehoseV2 initFirehoseV2(Object metaData) throws IOException
{
log.info("Calling the FireDepartment and getting a FirehoseV2.");
return fireDepartment.connect(metaData);
}
private void initPlumber()
{
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
}
@VisibleForTesting
Plumber getPlumber()
{
return plumber;
}
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
@Override
public void run()
{
initPlumber();
try {
final Closer closer = Closer.create();
try {
Object metadata = plumber.startJob();
Firehose firehose;
FirehoseV2 firehoseV2;
final boolean success;
if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehoseV2(metadata);
closer.register(firehoseV2);
success = runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
closer.register(firehose);
success = runFirehose(firehose);
}
if (success) {
// pluber.finishJob() is called only when every processing is successfully finished.
closer.register(() -> plumber.finishJob());
}
}
catch (Exception e) {
log.makeAlert(
e,
"[%s] aborted realtime processing[%s]",
e.getClass().getSimpleName(),
fireDepartment.getDataSchema().getDataSource()
).emit();
throw closer.rethrow(e);
}
catch (Error e) {
log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
throw closer.rethrow(e);
}
finally {
closer.close();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private boolean runFirehoseV2(FirehoseV2 firehose)
{
firehose.start();
log.info("FirehoseV2 started");
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehoseV2(firehose);
boolean haveRow = true;
while (haveRow) {
if (Thread.interrupted() || stopping) {
return false;
}
InputRow inputRow = null;
try {
inputRow = firehose.currRow();
if (inputRow != null) {
IncrementalIndexAddResult addResult = plumber.add(inputRow, committerSupplier);
int numRows = addResult.getRowCount();
if (numRows == -2) {
metrics.incrementDedup();
log.debug("Throwing away duplicate event[%s]", inputRow);
} else if (numRows < 0) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s] due to %s", inputRow, addResult.getReasonOfNotAdded());
} else {
metrics.incrementProcessed();
}
} else {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
}
}
catch (Exception e) {
log.makeAlert(e, "Unknown exception, Ignoring and continuing.")
.addData("inputRow", inputRow)
.emit();
}
try {
haveRow = firehose.advance();
}
catch (Exception e) {
log.debug(e, "exception in firehose.advance(), considering unparseable row");
metrics.incrementUnparseable();
}
}
return true;
}
private boolean runFirehose(Firehose firehose)
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
if (Thread.interrupted() || stopping) {
return false;
}
Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics);
}
return true;
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
}
}
}

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.realtime.firehose; package org.apache.druid.segment.realtime.firehose;
import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.FirehoseFactory;
@ -26,7 +26,6 @@ import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.utils.Runnables; import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.RealtimeModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import java.util.List;
import java.util.Properties;
/**
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see https://druid.apache.org/docs/latest/Realtime.html for a description"
)
public class CliRealtime extends ServerRunnable
{
private static final Logger log = new Logger(CliRealtime.class);
@Inject
private Properties properties;
public CliRealtime()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new RealtimeModule(),
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8284);
},
new ChatHandlerServerModule(properties),
new LookupModule()
);
}
}

View File

@ -1,131 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.InventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.RealtimeModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.timeline.DataSegment;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
*/
@Command(
name = "realtime",
description = "Runs a standalone realtime node for examples, see https://druid.apache.org/docs/latest/Realtime.html for a description"
)
public class CliRealtimeExample extends ServerRunnable
{
private static final Logger log = new Logger(CliRealtimeExample.class);
@Inject
private Properties properties;
public CliRealtimeExample()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new RealtimeModule(),
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8284);
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class).in(LazySingleton.class);
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class).in(LazySingleton.class);
binder.bind(InventoryView.class).to(NoopInventoryView.class).in(LazySingleton.class);
binder.bind(ServerView.class).to(NoopServerView.class).in(LazySingleton.class);
},
new ChatHandlerServerModule(properties),
new LookupModule()
);
}
private static class NoopServerView implements ServerView
{
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
// do nothing
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
// do nothing
}
}
private static class NoopInventoryView implements InventoryView
{
@Override
public DruidServer getInventoryValue(String serverKey)
{
return null;
}
@Override
public Collection<DruidServer> getInventory()
{
return ImmutableList.of();
}
@Override
public boolean isStarted()
{
return true;
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
return false;
}
}
}

View File

@ -59,7 +59,6 @@ public class Main
CliCoordinator.class, CliCoordinator.class,
CliHistorical.class, CliHistorical.class,
CliBroker.class, CliBroker.class,
CliRealtime.class,
CliOverlord.class, CliOverlord.class,
CliMiddleManager.class, CliMiddleManager.class,
CliRouter.class CliRouter.class
@ -69,11 +68,6 @@ public class Main
.withDefaultCommand(Help.class) .withDefaultCommand(Help.class)
.withCommands(serverCommands); .withCommands(serverCommands);
builder.withGroup("example")
.withDescription("Run an example")
.withDefaultCommand(Help.class)
.withCommands(CliRealtimeExample.class);
List<Class<? extends Runnable>> toolCommands = Arrays.asList( List<Class<? extends Runnable>> toolCommands = Arrays.asList(
DruidJsonValidator.class, DruidJsonValidator.class,
PullDependencies.class, PullDependencies.class,

View File

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.cli.QueryJettyServerInitializer;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.metadata.MetadataSegmentPublisher;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.NoopSegmentPublisher;
import org.apache.druid.segment.realtime.RealtimeManager;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.eclipse.jetty.server.Server;
import java.util.List;
/**
*/
public class RealtimeModule implements Module
{
@Override
public void configure(Binder binder)
{
PolyBind.createChoiceWithDefault(binder, "druid.publish.type", Key.get(SegmentPublisher.class), "metadata");
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(
binder,
Key.get(SegmentPublisher.class)
);
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class).in(LazySingleton.class);
publisherBinder.addBinding("metadata").to(MetadataSegmentPublisher.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.realtime.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
rowIngestionMetersHandlerProviderBinder
.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class)
.in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.realtime.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(ServiceAnnouncingChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));
handlerProviderBinder
.addBinding("announce")
.to(ServiceAnnouncingChatHandlerProvider.class)
.in(LazySingleton.class);
handlerProviderBinder
.addBinding("noop")
.to(NoopChatHandlerProvider.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
)
.toProvider(FireDepartmentsProvider.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.segment.handoff", CoordinatorBasedSegmentHandoffNotifierConfig.class);
binder.bind(SegmentHandoffNotifierFactory.class)
.to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
.in(LazySingleton.class);
binder.bind(CoordinatorClient.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.REALTIME));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, Server.class);
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, ZkCoordinator.class);
}
}

View File

@ -50,8 +50,6 @@ public class MainTest
//new Object[]{new CliInternalHadoopIndexer()}, //new Object[]{new CliInternalHadoopIndexer()},
new Object[]{new CliMiddleManager()}, new Object[]{new CliMiddleManager()},
new Object[]{new CliRealtime()},
new Object[]{new CliRealtimeExample()},
new Object[]{new CliRouter()} new Object[]{new CliRouter()}
); );
} }