From f7283378acc7a47e44bfba5a4fe63d086a6f9e10 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 2 Jul 2019 18:12:17 -0700 Subject: [PATCH] remove deprecated standalone realtime node (#7915) * remove CliRealtime, RealtimeManager, etc * add redirects for deleted page to page that explains the deleted thing * adjust docs --- docs/_redirects.json | 8 +- docs/content/configuration/index.md | 7 +- docs/content/configuration/realtime.md | 98 -- docs/content/dependencies/zookeeper.md | 4 +- docs/content/design/realtime.md | 80 -- docs/content/development/overview.md | 3 +- docs/content/ingestion/firehose.md | 6 +- docs/content/ingestion/ingestion-spec.md | 2 - docs/content/ingestion/standalone-realtime.md | 43 + docs/content/ingestion/stream-pull.md | 376 ------ .../druid/guice/FireDepartmentsProvider.java | 59 - .../druid/guice/RealtimeManagerConfig.java | 37 - .../segment/realtime/RealtimeManager.java | 393 ------ .../segment/realtime/RealtimeManagerTest.java | 1104 ----------------- .../CombiningFirehoseFactoryTest.java | 3 +- .../org/apache/druid/cli/CliRealtime.java | 73 -- .../apache/druid/cli/CliRealtimeExample.java | 131 -- .../main/java/org/apache/druid/cli/Main.java | 6 - .../apache/druid/guice/RealtimeModule.java | 133 -- .../java/org/apache/druid/cli/MainTest.java | 2 - 20 files changed, 56 insertions(+), 2512 deletions(-) delete mode 100644 docs/content/configuration/realtime.md delete mode 100644 docs/content/design/realtime.md create mode 100644 docs/content/ingestion/standalone-realtime.md delete mode 100644 docs/content/ingestion/stream-pull.md delete mode 100644 server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java delete mode 100644 server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java delete mode 100644 server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java rename server/src/test/java/org/apache/druid/{ => segment}/realtime/firehose/CombiningFirehoseFactoryTest.java (97%) delete mode 100644 services/src/main/java/org/apache/druid/cli/CliRealtime.java delete mode 100644 services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java delete mode 100644 services/src/main/java/org/apache/druid/guice/RealtimeModule.java diff --git a/docs/_redirects.json b/docs/_redirects.json index 18cdf69b4c6..508aedf2d5f 100644 --- a/docs/_redirects.json +++ b/docs/_redirects.json @@ -59,9 +59,9 @@ {"source": "Post-aggregations.html", "target": "querying/post-aggregations.html"}, {"source": "Query-Context.html", "target": "querying/query-context.html"}, {"source": "Querying.html", "target": "querying/querying.html"}, - {"source": "Realtime-Config.html", "target": "configuration/realtime.html"}, + {"source": "Realtime-Config.html", "target": "ingestion/standalone-realtime.html"}, + {"source": "Realtime.html", "target": "ingestion/standalone-realtime.html"}, {"source": "Realtime-ingestion.html", "target": "ingestion/stream-ingestion.html"}, - {"source": "Realtime.html", "target": "design/realtime.html"}, {"source": "Recommendations.html", "target": "operations/recommendations.html"}, {"source": "Rolling-Updates.html", "target": "operations/rolling-updates.html"}, {"source": "Router.html", "target": "development/router.html"}, @@ -167,4 +167,8 @@ {"source": "development/extensions-core/namespaced-lookup.html", "target": "lookups-cached-global.html"}, {"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"}, {"source": "development/extensions-contrib/orc.html", "target": "../extensions-core/orc.html"} + {"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"}, + {"source": "configuration/realtime.md", "target": "../ingestion/standalone-realtime.html"}, + {"source": "design/realtime.md", "target": "../ingestion/standalone-realtime.html"}, + {"source": "ingestion/stream-pull.md", "target": "../ingestion/standalone-realtime.html"} ] diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 0f7048976b7..c73fb36d83c 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -87,7 +87,6 @@ This page documents all of the configuration properties for each Druid service t * [Segment Discovery](#segment-discovery) * [Caching](#cache-configuration) * [General Query Configuration](#general-query-configuration) - * [Realtime processes (Deprecated)](#realtime-processes) ## Recommended Configuration File Organization @@ -493,7 +492,7 @@ To use graphite as emitter set `druid.emitter=graphite`. For configuration detai ### Metadata Storage -These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html), [Overlord](../design/overlord.html) and [Realtime Processes](../design/realtime.html). +These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html) and [Overlord](../design/overlord.html). |Property|Description|Default| |--------|-----------|-------| @@ -1672,7 +1671,3 @@ Supported query contexts: |`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None| |`useOffheap`|Set to true to store aggregations off-heap when merging results.|false| - -## Realtime processes - -Configuration for the deprecated realtime process can be found [here](../configuration/realtime.html). diff --git a/docs/content/configuration/realtime.md b/docs/content/configuration/realtime.md deleted file mode 100644 index dd319fe228c..00000000000 --- a/docs/content/configuration/realtime.md +++ /dev/null @@ -1,98 +0,0 @@ ---- -layout: doc_page -title: "Realtime Process Configuration" ---- - - - -# Realtime Process Configuration - -For general Apache Druid (incubating) Realtime Process information, see [here](../design/realtime.html). - -Runtime Configuration ---------------------- - -The realtime process uses several of the global configs in [Configuration](../configuration/index.html) and has the following set of configurations as well: - -### Process Config - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.host`|The host for the current process. This is used to advertise the current processes location as reachable from another process and should generally be specified such that `http://${druid.host}/` could actually talk to this process|InetAddress.getLocalHost().getCanonicalHostName()| -|`druid.plaintextPort`|This is the port to actually listen on; unless port mapping is used, this will be the same port as is on `druid.host`|8084| -|`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.html) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8284| -|`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|druid/realtime| - -### Realtime Operation - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.publish.type`|Where to publish segments. Choices are "noop" or "metadata".|metadata| -|`druid.realtime.specFile`|File location of realtime specFile.|none| - -### Storing Intermediate Segments - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.segmentCache.locations`|Where intermediate segments are stored. The maxSize should always be zero.|none| - - -### Query Configs - -#### Processing - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime processes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|auto (max 1GB)| -|`druid.processing.formatString`|Realtime and Historical processes use this format string to name their processing threads.|processing-%s| -|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`| -|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| -|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| -|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| - -The amount of direct memory needed by Druid is at least -`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can -ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=` at the command -line. - -#### General Query Configuration - -##### GroupBy Query Config - -See [groupBy server configuration](../querying/groupbyquery.html#server-configuration). - -##### Search Query Config - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.query.search.maxSearchLimit`|Maximum number of search results to return.|1000| - -### Caching - -You can optionally configure caching to be enabled on the realtime process by setting caching configs here. - -|Property|Possible Values|Description|Default| -|--------|---------------|-----------|-------| -|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false| -|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false| -|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`| -|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)| - -See [cache configuration](index.html#cache-configuration) for how to configure cache settings. diff --git a/docs/content/dependencies/zookeeper.md b/docs/content/dependencies/zookeeper.md index a41e815856a..5143f928799 100644 --- a/docs/content/dependencies/zookeeper.md +++ b/docs/content/dependencies/zookeeper.md @@ -27,7 +27,7 @@ title: "ZooKeeper" Apache Druid (incubating) uses [Apache ZooKeeper](http://zookeeper.apache.org/) (ZK) for management of current cluster state. The operations that happen over ZK are 1. [Coordinator](../design/coordinator.html) leader election -2. Segment "publishing" protocol from [Historical](../design/historical.html) and [Realtime](../design/realtime.html) +2. Segment "publishing" protocol from [Historical](../design/historical.html) 3. Segment load/drop protocol between [Coordinator](../design/coordinator.html) and [Historical](../design/historical.html) 4. [Overlord](../design/overlord.html) leader election 5. [Overlord](../design/overlord.html) and [MiddleManager](../design/middlemanager.html) task management @@ -44,7 +44,7 @@ ${druid.zk.paths.coordinatorPath}/_COORDINATOR The `announcementsPath` and `servedSegmentsPath` are used for this. -All [Historical](../design/historical.html) and [Realtime](../design/realtime.html) processes publish themselves on the `announcementsPath`, specifically, they will create an ephemeral znode at +All [Historical](../design/historical.html) processes publish themselves on the `announcementsPath`, specifically, they will create an ephemeral znode at ``` ${druid.zk.paths.announcementsPath}/${druid.host} diff --git a/docs/content/design/realtime.md b/docs/content/design/realtime.md deleted file mode 100644 index df6b4e0b2e3..00000000000 --- a/docs/content/design/realtime.md +++ /dev/null @@ -1,80 +0,0 @@ ---- -layout: doc_page -title: "Real-time Process" ---- - - - -# Real-time Process - -
-NOTE: Realtime processes are deprecated. Please use the Kafka Indexing Service for stream pull use cases instead. -
- -For Apache Druid (incubating) Real-time Process Configuration, see [Realtime Configuration](../configuration/realtime.html). - -For Real-time Ingestion, see [Realtime Ingestion](../ingestion/stream-ingestion.html). - -Realtime processes provide a realtime index. Data indexed via these processes is immediately available for querying. Realtime processes will periodically build segments representing the data they’ve collected over some span of time and transfer these segments off to [Historical](../design/historical.html) processes. They use ZooKeeper to monitor the transfer and the metadata storage to store metadata about the transferred segment. Once transfered, segments are forgotten by the Realtime processes. - -### Running - -``` -org.apache.druid.cli.Main server realtime -``` -Segment Propagation -------------------- - -The segment propagation diagram for real-time data ingestion can be seen below: - -![Segment Propagation](../../img/segmentPropagation.png "Segment Propagation") - -You can read about the various components shown in this diagram under the Architecture section (see the menu on the right). Note that some of the names are now outdated. - -### Firehose - -See [Firehose](../ingestion/firehose.html). - -### Plumber - -See [Plumber](../design/plumber.html) - -Extending the code ------------------- - -Realtime integration is intended to be extended in two ways: - -1. Connect to data streams from varied systems ([Firehose](https://github.com/apache/incubator-druid/blob/master/core/src/main/org/apache/druid/data/input/FirehoseFactory.java)) -2. Adjust the publishing strategy to match your needs ([Plumber](https://github.com/apache/incubator-druid/blob/master/server/src/main/java/org/apache/druid/segment/realtime/plumber/PlumberSchool.java)) - -The expectations are that the former will be very common and something that users of Druid will do on a fairly regular basis. Most users will probably never have to deal with the latter form of customization. Indeed, we hope that all potential use cases can be packaged up as part of Druid proper without requiring proprietary customization. - -Given those expectations, adding a firehose is straightforward and completely encapsulated inside of the interface. Adding a plumber is more involved and requires understanding of how the system works to get right, it’s not impossible, but it’s not intended that individuals new to Druid will be able to do it immediately. - -HTTP Endpoints --------------- - -The real-time process exposes several HTTP endpoints for interactions. - -### GET - -* `/status` - -Returns the Druid version, loaded extensions, memory used, total memory and other useful information about the process. diff --git a/docs/content/development/overview.md b/docs/content/development/overview.md index ad360a510e3..db89fce6a81 100644 --- a/docs/content/development/overview.md +++ b/docs/content/development/overview.md @@ -54,8 +54,7 @@ Most of the coordination logic for (real-time) ingestion is in the Druid indexin ## Real-time Ingestion Druid loads data through `FirehoseFactory.java` classes. Firehoses often wrap other firehoses, where, similar to the design of the -query runners, each firehose adds a layer of logic. Much of the core management logic is in `RealtimeManager.java` and the -persist and hand-off logic is in `RealtimePlumber.java`. +query runners, each firehose adds a layer of logic, and the persist and hand-off logic is in `RealtimePlumber.java`. ## Hadoop-based Batch Ingestion diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index f35bcc0b6e9..274e3b6ec23 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -24,7 +24,7 @@ title: "Apache Druid (incubating) Firehoses" # Apache Druid (incubating) Firehoses -Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html), stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html), and the [stream-pull (deprecated)](../ingestion/stream-pull.html) ingestion model. +Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html), stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html) ingestion model. They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose. @@ -204,9 +204,7 @@ This can be used to merge data from more than one firehose. ### Streaming Firehoses -The firehoses shown below should only be used with the [stream-pull (deprecated)](../ingestion/stream-pull.html) ingestion model, as they are not suitable for batch ingestion. - -The EventReceiverFirehose is also used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). +The EventReceiverFirehose is used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). These firehoses are not suitable for batch ingestion. #### EventReceiverFirehose diff --git a/docs/content/ingestion/ingestion-spec.md b/docs/content/ingestion/ingestion-spec.md index 3b03c5f35dd..df37f5b7af2 100644 --- a/docs/content/ingestion/ingestion-spec.md +++ b/docs/content/ingestion/ingestion-spec.md @@ -310,7 +310,6 @@ The IOConfig spec differs based on the ingestion task type. * Hadoop Batch ingestion: See [Hadoop Batch IOConfig](../ingestion/hadoop.html#ioconfig) * Kafka Indexing Service: See [Kafka Supervisor IOConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorIOConfig) * Stream Push Ingestion: Stream push ingestion with Tranquility does not require an IO Config. -* Stream Pull Ingestion (Deprecated): See [Stream pull ingestion](../ingestion/stream-pull.html#ioconfig). # Tuning Config @@ -320,7 +319,6 @@ The TuningConfig spec differs based on the ingestion task type. * Hadoop Batch ingestion: See [Hadoop Batch TuningConfig](../ingestion/hadoop.html#tuningconfig) * Kafka Indexing Service: See [Kafka Supervisor TuningConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorTuningConfig) * Stream Push Ingestion (Tranquility): See [Tranquility TuningConfig](http://static.druid.io/tranquility/api/latest/#com.metamx.tranquility.druid.DruidTuning). -* Stream Pull Ingestion (Deprecated): See [Stream pull ingestion](../ingestion/stream-pull.html#tuningconfig). # Evaluating Timestamp, Dimensions and Metrics diff --git a/docs/content/ingestion/standalone-realtime.md b/docs/content/ingestion/standalone-realtime.md new file mode 100644 index 00000000000..81ce89d6f09 --- /dev/null +++ b/docs/content/ingestion/standalone-realtime.md @@ -0,0 +1,43 @@ +--- +layout: doc_page +title: "Realtime Process" +--- + + + +# Realtime Process + +Older versions of Apache Druid (incubating) supported a standalone 'Realtime' process to query and index 'stream pull' +modes of real-time ingestion. These processes would periodically build segments for the data they had collected over +some span of time and then set up hand-off to [Historical](../design/historical.html) servers. + +This processes could be invoked by + +``` +org.apache.druid.cli.Main server realtime +``` + +This model of stream pull ingestion was deprecated for a number of both operational and architectural reasons, and +removed completely in Druid 0.16.0. Operationally, realtime nodes were difficult to configure, deploy, and scale because +each node required an unique configuration. The design of the stream pull ingestion system for realtime nodes also +suffered from limitations which made it not possible to achieve exactly once ingestion. + +Please consider using the [Kafka Indexing Service](../development/extensions-core/kafka-ingestion.html) or +[Kinesis Indexing Service](../development/extensions-core/kinesis-ingestion.md) for stream pull ingestion instead. diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md deleted file mode 100644 index 38f6a807ef2..00000000000 --- a/docs/content/ingestion/stream-pull.md +++ /dev/null @@ -1,376 +0,0 @@ ---- -layout: doc_page -title: "Stream Pull Ingestion" ---- - - - -
-NOTE: Realtime processes are deprecated. Please use the Kafka Indexing Service for stream pull use cases instead. -
- -# Stream Pull Ingestion - -If you have an external service that you want to pull data from, you have two options. The simplest -option is to set up a "copying" service that reads from the data source and writes to Apache Druid (incubating) using -the [stream push method](stream-push.html). - -Another option is *stream pull*. With this approach, a Druid Realtime Process ingests data from a -[Firehose](../ingestion/firehose.html) connected to the data you want to -read. The Druid quickstart and tutorials do not include information about how to set up standalone realtime processes, but -they can be used in place for Tranquility server and the indexing service. Please note that Realtime processes have different properties and roles than the indexing service. - -## Realtime Process Ingestion - -Much of the configuration governing Realtime processes and the ingestion of data is set in the Realtime spec file, discussed on this page. - -For general Real-time Process information, see [here](../design/realtime.html). - -For Real-time Process Configuration, see [Realtime Configuration](../configuration/realtime.html). - -For writing your own plugins to the real-time process, see [Firehose](../ingestion/firehose.html). - -## Realtime "specFile" - -The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following: - -```json -[ - { - "dataSchema" : { - "dataSource" : "wikipedia", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "timestampSpec" : { - "column" : "timestamp", - "format" : "auto" - }, - "dimensionsSpec" : { - "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], - "dimensionExclusions" : [], - "spatialDimensions" : [] - } - } - }, - "metricsSpec" : [{ - "type" : "count", - "name" : "count" - }, { - "type" : "doubleSum", - "name" : "added", - "fieldName" : "added" - }, { - "type" : "doubleSum", - "name" : "deleted", - "fieldName" : "deleted" - }, { - "type" : "doubleSum", - "name" : "delta", - "fieldName" : "delta" - }], - "granularitySpec" : { - "type" : "uniform", - "segmentGranularity" : "DAY", - "queryGranularity" : "NONE" - } - }, - "ioConfig" : { - "type" : "realtime", - "firehose": { - "type": "kafka-0.8", - "consumerProps": { - "zookeeper.connect": "localhost:2181", - "zookeeper.connection.timeout.ms" : "15000", - "zookeeper.session.timeout.ms" : "15000", - "zookeeper.sync.time.ms" : "5000", - "group.id": "druid-example", - "fetch.message.max.bytes" : "1048586", - "auto.offset.reset": "largest", - "auto.commit.enable": "false" - }, - "feed": "wikipedia" - }, - "plumber": { - "type": "realtime" - } - }, - "tuningConfig": { - "type" : "realtime", - "maxRowsInMemory": 1000000, - "intermediatePersistPeriod": "PT10M", - "windowPeriod": "PT10M", - "basePersistDirectory": "\/tmp\/realtime\/basePersist", - "rejectionPolicy": { - "type": "serverTime" - } - } - } -] -``` - -This is a JSON Array so you can give more than one realtime stream to a given process. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks. - -There are three parts to a realtime stream specification, `dataSchema`, `IOConfig`, and `tuningConfig` which we will go into here. - -### DataSchema - -This field is required. - -See [Ingestion](../ingestion/index.html) - -### IOConfig - -This field is required. - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|This should always be 'realtime'.|yes| -|firehose|JSON Object|Where the data is coming from. Described in detail below.|yes| -|plumber|JSON Object|Where the data is going. Described in detail below.|yes| - -#### Firehose - -See [Firehose](../ingestion/firehose.html) for more information on various firehoses. - -#### Plumber - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|This should always be 'realtime'.|no| - -### TuningConfig - -The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|This should always be 'realtime'.|no| -|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)| -|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size.|no (default == One-sixth of max JVM memory)| -|windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10M)| -|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10M)| -|basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)| -|versioningPolicy|Object|How to version segments.|no (default == based on segment start time)| -|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default == 'serverTime')| -|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime processes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec')| -|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)| -|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)| -|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|no (default == false)| -|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| -|alertTimeout|long|Milliseconds timeout after which an alert is created if the task isn't finished by then. This allows users to monitor tasks that are failing to finish and give up the worker slot for any unexpected errors.|no (default == 0)| -|segmentWriteOutMediumFactory|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)| -|dedupColumn|String|the column to judge whether this row is already in this segment, if so, throw away this row. If it is String type column, to reduce heap cost, use long type hashcode of this column's value to judge whether this row is already ingested, so there maybe very small chance to throw away a row that is not ingested before.|no (default == null)| -|indexSpec|Object|Tune how data is indexed. See below for more information.|no| - -Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/apache/incubator-druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`. - -#### Rejection Policy - -The following policies are available: - -* `serverTime` – The recommended policy for "current time" data, it is optimal for current data that is generated and ingested in real time. Uses `windowPeriod` to accept only those events that are inside the window looking forward and back. -* `messageTime` – Can be used for non-"current time" as long as that data is relatively in sequence. Events are rejected if they are less than `windowPeriod` from the event with the latest timestamp. Hand off only occurs if an event is seen after the segmentGranularity and `windowPeriod` (hand off will not periodically occur unless you have a constant stream of data). -* `none` – All events are accepted. Never hands off data unless shutdown() is called on the configured firehose. - -#### SegmentWriteOutMediumFactory - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../configuration/index.html#segmentwriteoutmediumfactory) for explanation and available options.|yes| - -#### IndexSpec - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object; see below for options.|no (defaults to Concise)| -|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)| -|metricCompression|String|Compression format for metric columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)| - -##### Bitmap types - -For Concise bitmaps: - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|Must be `concise`.|yes| - -For Roaring bitmaps: - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|Must be `roaring`.|yes| -|compressRunOnSerialization|Boolean|Use a run-length encoding where it is estimated as more space efficient.|no (default == `true`)| - -#### Sharding - -Druid uses shards, or segments with partition numbers, to more efficiently handle large amounts of incoming data. In Druid, shards represent the segments that together cover a time interval based on the value of `segmentGranularity`. If, for example, `segmentGranularity` is set to "hour", then a number of shards may be used to store the data for that hour. Sharding along dimensions may also occur to optimize efficiency. - -Segments are identified by datasource, time interval, and version. With sharding, a segment is also identified by a partition number. Typically, each shard will have the same version but a different partition number to uniquely identify it. - -In small-data scenarios, sharding is unnecessary and can be set to none (the default): - -```json - "shardSpec": {"type": "none"} -``` - -However, in scenarios with multiple realtime processes, `none` is less useful as it cannot help with scaling data volume (see below). Note that for the batch indexing service, no explicit configuration is required; sharding is provided automatically. - -Druid uses sharding based on the `shardSpec` setting you configure. The recommended choices, `linear` and `numbered`, are discussed below; other types have been useful for internal Druid development but are not appropriate for production setups. - -Keep in mind, that sharding configuration has nothing to do with configured firehose. For example, if you set partition number to 0, it doesn't mean that Kafka firehose will consume only from 0 topic partition. - -##### Linear - -This strategy provides following advantages: - -* There is no need to update the fileSpec configurations of existing processes when adding new processes. -* All unique shards are queried, regardless of whether the partition numbering is sequential or not (it allows querying of partitions 0 and 2, even if partition 1 is missing). - -Configure `linear` under `schema`: - -```json - "shardSpec": { - "type": "linear", - "partitionNum": 0 - } -``` - - -##### Numbered - -This strategy is similar to `linear` except that it does not tolerate non-sequential partition numbering (it will *not* allow querying of partitions 0 and 2 if partition 1 is missing). It also requires explicitly setting the total number of partitions. - -Configure `numbered` under `schema`: - -```json - "shardSpec": { - "type": "numbered", - "partitionNum": 0, - "partitions": 2 - } -``` - - -##### Scale and Redundancy - -The `shardSpec` configuration can be used to create redundancy by having the same `partitionNum` values on different processes. - -For example, if RealTimeProcess1 has: - -```json - "shardSpec": { - "type": "linear", - "partitionNum": 0 - } -``` - -and RealTimeProcess2 has: - -```json - "shardSpec": { - "type": "linear", - "partitionNum": 0 - } -``` - -then two realtime processes can store segments with the same datasource, version, time interval, and partition number. Brokers that query for data in such segments will assume that they hold the same data, and the query will target only one of the segments. - -`shardSpec` can also help achieve scale. For this, add processes with a different `partionNum`. Continuing with the example, if RealTimeProcess3 has: - -```json - "shardSpec": { - "type": "linear", - "partitionNum": 1 - } -``` - -then it can store segments with the same datasource, time interval, and version as in the first two processes, but with a different partition number. Brokers that query for data in such segments will assume that a segment from RealTimeProcess3 holds *different* data, and the query will target it along with a segment from the first two processes. - -You can use type `numbered` similarly. Note that type `none` is essentially type `linear` with all shards having a fixed `partitionNum` of 0. - -## Constraints - -The following table summarizes constraints between settings in the spec file for the Realtime subsystem. - -|Name|Effect|Minimum|Recommended| -|----|------|-------|-----------| -|windowPeriod| When reading a row, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers | -|segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity| -|queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month | -|intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory | -|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set| number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | -|maxBytesInMemory| The number of bytes to keep in memory before a flush to disk. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | - -The normal, expected use cases have the following overall constraints: `intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and `queryGranularity ≤ segmentGranularity` - -## Limitations - -### Kafka - -Standalone realtime processes use the Kafka high level consumer, which imposes a few restrictions. - -Druid replicates segment such that logically equivalent data segments are concurrently hosted on N processes. If N–1 processes go down, -the data will still be available for querying. On real-time processes, this process depends on maintaining logically equivalent -data segments on each of the N processes, which is not possible with standard Kafka consumer groups if your Kafka topic requires more than one consumer -(because consumers in different consumer groups will split up the data differently). - -For example, let's say your topic is split across Kafka partitions 1, 2, & 3 and you have 2 real-time processes with linear shard specs 1 & 2. -Both of the real-time processes are in the same consumer group. Real-time process 1 may consume data from partitions 1 & 3, and real-time process 2 may consume data from partition 2. -Querying for your data through the Broker will yield correct results. - -The problem arises if you want to replicate your data by creating real-time processes 3 & 4. These new real-time processes also -have linear shard specs 1 & 2, and they will consume data from Kafka using a different consumer group. In this case, -real-time process 3 may consume data from partitions 1 & 2, and real-time process 4 may consume data from partition 2. -From Druid's perspective, the segments hosted by real-time processes 1 and 3 are the same, and the data hosted by real-time processes -2 and 4 are the same, although they are reading from different Kafka partitions. Querying for the data will yield inconsistent -results. - -Is this always a problem? No. If your data is small enough to fit on a single Kafka partition, you can replicate without issues. -Otherwise, you can run real-time processes without replication. - -Please note that druid will skip over event that failed its checksum and it is corrupt. - -### Locking - -Using stream pull ingestion with Realtime processes together batch ingestion may introduce data override issues. For example, if you -are generating hourly segments for the current day, and run a daily batch job for the current day's data, the segments created by -the batch job will have a more recent version than most of the segments generated by realtime ingestion. If your batch job is indexing -data that isn't yet complete for the day, the daily segment created by the batch job can override recent segments created by -realtime processes. A portion of data will appear to be lost in this case. - -### Schema changes - -Standalone realtime processes require stopping a process to update a schema, and starting it up again for the schema to take effect. -This can be difficult to manage at scale, especially with multiple partitions. - -### Log management - -Each standalone realtime process has its own set of logs. Diagnosing errors across many partitions across many servers may be -difficult to manage and track at scale. - -## Deployment Notes - -Stream ingestion may generate a large number of small segments because it's difficult to optimize the segment size at -ingestion time. The number of segments will increase over time, and this might cause the query performance issue. - -Details on how to optimize the segment size can be found on [Segment size optimization](../operations/segment-optimization.html). diff --git a/server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java b/server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java deleted file mode 100644 index feab7e0eeec..00000000000 --- a/server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.guice; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import com.google.inject.Provider; -import org.apache.druid.segment.realtime.FireDepartment; - -import java.util.ArrayList; -import java.util.List; - -/** - */ -public class FireDepartmentsProvider implements Provider> -{ - private final List fireDepartments = new ArrayList<>(); - - @Inject - public FireDepartmentsProvider( - ObjectMapper jsonMapper, - RealtimeManagerConfig config - ) - { - try { - this.fireDepartments.addAll( - jsonMapper.readValue(config.getSpecFile(), new TypeReference>() {}) - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - - @Override - public List get() - { - return fireDepartments; - } -} diff --git a/server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java b/server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java deleted file mode 100644 index 97da66d6fd7..00000000000 --- a/server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.guice; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.File; - -/** - */ -public class RealtimeManagerConfig -{ - @JsonProperty - private File specFile; - - public File getSpecFile() - { - return specFile; - } -} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java deleted file mode 100644 index feaaeb0e983..00000000000 --- a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.realtime; - - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; -import com.google.inject.Inject; -import org.apache.druid.data.input.Committer; -import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseV2; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.FinalizeResultsQueryRunner; -import org.apache.druid.query.NoopQueryRunner; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryRunner; -import org.apache.druid.query.QueryRunnerFactory; -import org.apache.druid.query.QueryRunnerFactoryConglomerate; -import org.apache.druid.query.QuerySegmentWalker; -import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.spec.SpecificSegmentSpec; -import org.apache.druid.segment.incremental.IncrementalIndexAddResult; -import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.realtime.plumber.Committers; -import org.apache.druid.segment.realtime.plumber.Plumber; -import org.apache.druid.segment.realtime.plumber.Plumbers; -import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; -import org.joda.time.Interval; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * - */ -public class RealtimeManager implements QuerySegmentWalker -{ - private static final EmittingLogger log = new EmittingLogger(RealtimeManager.class); - - private final List fireDepartments; - private final QueryRunnerFactoryConglomerate conglomerate; - private final DataSegmentServerAnnouncer serverAnnouncer; - - /** - * key=data source name,value=mappings of partition number to FireChief - */ - private final Map> chiefs; - - private ExecutorService fireChiefExecutor; - private boolean stopping; - - @Inject - public RealtimeManager( - List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate, - DataSegmentServerAnnouncer serverAnnouncer - ) - { - this(fireDepartments, conglomerate, serverAnnouncer, new HashMap<>()); - } - - @VisibleForTesting - RealtimeManager( - List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate, - DataSegmentServerAnnouncer serverAnnouncer, - Map> chiefs - ) - { - this.fireDepartments = fireDepartments; - this.conglomerate = conglomerate; - this.serverAnnouncer = serverAnnouncer; - this.chiefs = chiefs == null ? new HashMap<>() : new HashMap<>(chiefs); - } - - @VisibleForTesting - Map getFireChiefs(String dataSource) - { - return chiefs.get(dataSource); - } - - @LifecycleStart - public void start() - { - serverAnnouncer.announce(); - - fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), "chief-%d"); - - for (final FireDepartment fireDepartment : fireDepartments) { - final DataSchema schema = fireDepartment.getDataSchema(); - - final FireChief chief = new FireChief(fireDepartment, conglomerate); - chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>()) - .put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); - - fireChiefExecutor.submit(chief); - } - } - - @LifecycleStop - public void stop() - { - stopping = true; - try { - if (fireChiefExecutor != null) { - fireChiefExecutor.shutdownNow(); - Preconditions.checkState( - fireChiefExecutor.awaitTermination(10, TimeUnit.SECONDS), - "persistExecutor not terminated" - ); - } - } - catch (InterruptedException e) { - throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()"); - } - serverAnnouncer.unannounce(); - } - - public FireDepartmentMetrics getMetrics(String datasource) - { - Map chiefs = this.chiefs.get(datasource); - if (chiefs == null) { - return null; - } - FireDepartmentMetrics snapshot = null; - for (FireChief chief : chiefs.values()) { - if (snapshot == null) { - snapshot = chief.getMetrics().snapshot(); - } else { - snapshot.merge(chief.getMetrics()); - } - } - return snapshot; - } - - @Override - public QueryRunner getQueryRunnerForIntervals(final Query query, Iterable intervals) - { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final Map partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource() - .getNames())); - - return partitionChiefs == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - Execs.directExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - partitionChiefs.values(), new Function>() - { - @Override - public QueryRunner apply(FireChief fireChief) - { - return fireChief.getQueryRunner(query); - } - } - ) - ) - ); - } - - @Override - public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) - { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final Map partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource() - .getNames())); - - return partitionChiefs == null - ? new NoopQueryRunner() - : factory.getToolchest().mergeResults( - factory.mergeRunners( - Execs.directExecutor(), - Iterables.transform( - specs, - new Function>() - { - @Override - public QueryRunner apply(SegmentDescriptor spec) - { - final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber()); - return retVal == null - ? new NoopQueryRunner() - : retVal.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(spec))); - } - } - ) - ) - ); - } - - class FireChief implements Runnable - { - private final FireDepartment fireDepartment; - private final FireDepartmentMetrics metrics; - private final RealtimeTuningConfig config; - private final QueryRunnerFactoryConglomerate conglomerate; - - private Plumber plumber; - - FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate) - { - this.fireDepartment = fireDepartment; - this.conglomerate = conglomerate; - this.config = fireDepartment.getTuningConfig(); - this.metrics = fireDepartment.getMetrics(); - } - - private Firehose initFirehose() - { - try { - log.info("Calling the FireDepartment and getting a Firehose."); - return fireDepartment.connect(); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - - private FirehoseV2 initFirehoseV2(Object metaData) throws IOException - { - log.info("Calling the FireDepartment and getting a FirehoseV2."); - return fireDepartment.connect(metaData); - } - - private void initPlumber() - { - log.info("Someone get us a plumber!"); - plumber = fireDepartment.findPlumber(); - } - - @VisibleForTesting - Plumber getPlumber() - { - return plumber; - } - - public FireDepartmentMetrics getMetrics() - { - return metrics; - } - - @Override - public void run() - { - initPlumber(); - - try { - final Closer closer = Closer.create(); - - try { - Object metadata = plumber.startJob(); - - Firehose firehose; - FirehoseV2 firehoseV2; - final boolean success; - if (fireDepartment.checkFirehoseV2()) { - firehoseV2 = initFirehoseV2(metadata); - closer.register(firehoseV2); - success = runFirehoseV2(firehoseV2); - } else { - firehose = initFirehose(); - closer.register(firehose); - success = runFirehose(firehose); - } - if (success) { - // pluber.finishJob() is called only when every processing is successfully finished. - closer.register(() -> plumber.finishJob()); - } - } - catch (Exception e) { - log.makeAlert( - e, - "[%s] aborted realtime processing[%s]", - e.getClass().getSimpleName(), - fireDepartment.getDataSchema().getDataSource() - ).emit(); - throw closer.rethrow(e); - } - catch (Error e) { - log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource()) - .emit(); - throw closer.rethrow(e); - } - finally { - closer.close(); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - - private boolean runFirehoseV2(FirehoseV2 firehose) - { - firehose.start(); - - log.info("FirehoseV2 started"); - final Supplier committerSupplier = Committers.supplierFromFirehoseV2(firehose); - boolean haveRow = true; - while (haveRow) { - if (Thread.interrupted() || stopping) { - return false; - } - InputRow inputRow = null; - try { - inputRow = firehose.currRow(); - if (inputRow != null) { - IncrementalIndexAddResult addResult = plumber.add(inputRow, committerSupplier); - int numRows = addResult.getRowCount(); - if (numRows == -2) { - metrics.incrementDedup(); - log.debug("Throwing away duplicate event[%s]", inputRow); - } else if (numRows < 0) { - metrics.incrementThrownAway(); - log.debug("Throwing away event[%s] due to %s", inputRow, addResult.getReasonOfNotAdded()); - } else { - metrics.incrementProcessed(); - } - } else { - log.debug("thrown away null input row, considering unparseable"); - metrics.incrementUnparseable(); - } - } - catch (Exception e) { - log.makeAlert(e, "Unknown exception, Ignoring and continuing.") - .addData("inputRow", inputRow) - .emit(); - } - - try { - haveRow = firehose.advance(); - } - catch (Exception e) { - log.debug(e, "exception in firehose.advance(), considering unparseable row"); - metrics.incrementUnparseable(); - } - } - return true; - } - - private boolean runFirehose(Firehose firehose) - { - final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); - while (firehose.hasMore()) { - if (Thread.interrupted() || stopping) { - return false; - } - Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics); - } - return true; - } - - public QueryRunner getQueryRunner(Query query) - { - QueryRunnerFactory> factory = conglomerate.findFactory(query); - QueryToolChest> toolChest = factory.getToolchest(); - - return new FinalizeResultsQueryRunner(plumber.getQueryRunner(query), toolChest); - } - } -} diff --git a/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java deleted file mode 100644 index 7df4c14984f..00000000000 --- a/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java +++ /dev/null @@ -1,1104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.realtime; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import org.apache.druid.data.input.Committer; -import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; -import org.apache.druid.data.input.FirehoseFactoryV2; -import org.apache.druid.data.input.FirehoseV2; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.Row; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.query.BaseQuery; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryRunner; -import org.apache.druid.query.QueryRunnerFactory; -import org.apache.druid.query.QueryRunnerFactoryConglomerate; -import org.apache.druid.query.QueryRunnerTestHelper; -import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.CountAggregatorFactory; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.dimension.DefaultDimensionSpec; -import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.query.groupby.GroupByQueryConfig; -import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; -import org.apache.druid.query.groupby.GroupByQueryRunnerTest; -import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; -import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; -import org.apache.druid.query.spec.SpecificSegmentQueryRunner; -import org.apache.druid.query.spec.SpecificSegmentSpec; -import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.incremental.IncrementalIndexAddResult; -import org.apache.druid.segment.incremental.IndexSizeExceededException; -import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.RealtimeIOConfig; -import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.indexing.TuningConfigs; -import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.segment.realtime.plumber.Plumber; -import org.apache.druid.segment.realtime.plumber.Sink; -import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; -import org.apache.druid.timeline.partition.LinearShardSpec; -import org.apache.druid.utils.Runnables; -import org.easymock.EasyMock; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.Period; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * - */ -public class RealtimeManagerTest -{ - private static QueryRunnerFactory factory; - private static Closer resourceCloser; - private static QueryRunnerFactoryConglomerate conglomerate; - - private static final List rows = Arrays.asList( - makeRow(DateTimes.of("9000-01-01").getMillis()), - makeRow(new ParseException("parse error")), - null, - makeRow(System.currentTimeMillis()) - ); - - private RealtimeManager realtimeManager; - private RealtimeManager realtimeManager2; - private RealtimeManager realtimeManager3; - private DataSchema schema; - private DataSchema schema2; - private TestPlumber plumber; - private TestPlumber plumber2; - private RealtimeTuningConfig tuningConfig_0; - private RealtimeTuningConfig tuningConfig_1; - private DataSchema schema3; - - @BeforeClass - public static void setupStatic() - { - final Pair factoryAndCloser = initFactory(); - factory = factoryAndCloser.lhs; - resourceCloser = factoryAndCloser.rhs; - conglomerate = new QueryRunnerFactoryConglomerate() - { - @Override - public > QueryRunnerFactory findFactory(QueryType query) - { - return factory; - } - }; - } - - @AfterClass - public static void teardownStatic() throws IOException - { - resourceCloser.close(); - } - - @Before - public void setUp() - { - ObjectMapper jsonMapper = new DefaultObjectMapper(); - - schema = new DataSchema( - "test", - null, - new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), - null, - jsonMapper - ); - schema2 = new DataSchema( - "testV2", - null, - new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), - null, - jsonMapper - ); - RealtimeIOConfig ioConfig = new RealtimeIOConfig( - new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser, File temporaryDirectory) - { - return new TestFirehose(rows.iterator()); - } - }, - (schema, config, metrics) -> plumber, - null - ); - RealtimeIOConfig ioConfig2 = new RealtimeIOConfig( - null, - (schema, config, metrics) -> plumber2, - new FirehoseFactoryV2() - { - @Override - public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseException - { - return new TestFirehoseV2(rows.iterator()); - } - } - ); - RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( - 1, - null, - new Period("P1Y"), - null, - null, - null, - null, - null, - null, - null, - null, - 0, - 0, - null, - null, - null, - null, - null - ); - plumber = new TestPlumber(new Sink( - Intervals.of("0/P5000Y"), - schema, - tuningConfig.getShardSpec(), - DateTimes.nowUtc().toString(), - tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions(), - tuningConfig.getDedupColumn() - )); - - realtimeManager = new RealtimeManager( - Collections.singletonList( - new FireDepartment( - schema, - ioConfig, - tuningConfig - ) - ), - null, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class) - ); - plumber2 = new TestPlumber(new Sink( - Intervals.of("0/P5000Y"), - schema2, - tuningConfig.getShardSpec(), - DateTimes.nowUtc().toString(), - tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions(), - tuningConfig.getDedupColumn() - )); - - realtimeManager2 = new RealtimeManager( - Collections.singletonList( - new FireDepartment( - schema2, - ioConfig2, - tuningConfig - ) - ), - null, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class) - ); - - tuningConfig_0 = new RealtimeTuningConfig( - 1, - null, - new Period("P1Y"), - null, - null, - null, - null, - null, - new LinearShardSpec(0), - null, - null, - 0, - 0, - null, - null, - null, - null, - null - ); - - tuningConfig_1 = new RealtimeTuningConfig( - 1, - null, - new Period("P1Y"), - null, - null, - null, - null, - null, - new LinearShardSpec(1), - null, - null, - 0, - 0, - null, - null, - null, - null, - null - ); - - schema3 = new DataSchema( - "testing", - null, - new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, - new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), - null, - jsonMapper - ); - - FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0); - FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1); - - realtimeManager3 = new RealtimeManager( - Arrays.asList(department_0, department_1), - conglomerate, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - null - ); - } - - @After - public void tearDown() - { - realtimeManager.stop(); - realtimeManager2.stop(); - realtimeManager3.stop(); - } - - @Test - public void testRun() throws Exception - { - realtimeManager.start(); - - Stopwatch stopwatch = Stopwatch.createStarted(); - while (realtimeManager.getMetrics("test").processed() != 1) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Realtime manager should have completed processing 2 events!"); - } - } - - Assert.assertEquals(1, realtimeManager.getMetrics("test").processed()); - Assert.assertEquals(2, realtimeManager.getMetrics("test").thrownAway()); - Assert.assertEquals(1, realtimeManager.getMetrics("test").unparseable()); - Assert.assertTrue(plumber.isStartedJob()); - Assert.assertTrue(plumber.isFinishedJob()); - Assert.assertEquals(0, plumber.getPersistCount()); - } - - @Test - public void testRunV2() throws Exception - { - realtimeManager2.start(); - - Stopwatch stopwatch = Stopwatch.createStarted(); - while (realtimeManager2.getMetrics("testV2").processed() != 1) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Realtime manager should have completed processing 2 events!"); - } - } - - Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").processed()); - Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").thrownAway()); - Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable()); - Assert.assertTrue(plumber2.isStartedJob()); - Assert.assertTrue(plumber2.isFinishedJob()); - Assert.assertEquals(0, plumber2.getPersistCount()); - } - - @Test(timeout = 60_000L) - public void testNormalStop() throws InterruptedException - { - final TestFirehose firehose = new TestFirehose(rows.iterator()); - final TestFirehoseV2 firehoseV2 = new TestFirehoseV2(rows.iterator()); - final RealtimeIOConfig ioConfig = new RealtimeIOConfig( - new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser, File temporaryDirectory) - { - return firehose; - } - }, - (schema, config, metrics) -> plumber, - null - ); - RealtimeIOConfig ioConfig2 = new RealtimeIOConfig( - null, - (schema, config, metrics) -> plumber2, - (parser, arg) -> firehoseV2 - ); - - final FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0); - final FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1); - - final RealtimeManager realtimeManager = new RealtimeManager( - Arrays.asList(department_0, department_1), - conglomerate, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - null - ); - - realtimeManager.start(); - while (realtimeManager.getMetrics("testing").processed() < 2) { - Thread.sleep(100); - } - realtimeManager.stop(); - - Assert.assertTrue(firehose.isClosed()); - Assert.assertTrue(firehoseV2.isClosed()); - Assert.assertTrue(plumber.isFinishedJob()); - Assert.assertTrue(plumber2.isFinishedJob()); - } - - @Test(timeout = 60_000L) - public void testStopByInterruption() - { - final SleepingFirehose firehose = new SleepingFirehose(); - final RealtimeIOConfig ioConfig = new RealtimeIOConfig( - new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser, File temporaryDirectory) - { - return firehose; - } - }, - (schema, config, metrics) -> plumber, - null - ); - - final FireDepartment department_0 = new FireDepartment(schema, ioConfig, tuningConfig_0); - - final RealtimeManager realtimeManager = new RealtimeManager( - Collections.singletonList(department_0), - conglomerate, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - null - ); - - realtimeManager.start(); - realtimeManager.stop(); - - Assert.assertTrue(firehose.isClosed()); - Assert.assertFalse(plumber.isFinishedJob()); - } - - @Test(timeout = 60_000L) - public void testQueryWithInterval() throws InterruptedException - { - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 270L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 236L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 316L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 240L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 5740L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 242L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 5800L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 156L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 238L), - - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 294L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 224L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 332L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 226L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4894L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 228L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 5010L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 194L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 252L) - ); - - realtimeManager3.start(); - - awaitStarted(); - - for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { - GroupByQuery query = GroupByQuery - .builder() - .setDataSource(QueryRunnerTestHelper.dataSource) - .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setDimensions(new DefaultDimensionSpec("quality", "alias")) - .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) - .setGranularity(QueryRunnerTestHelper.dayGran) - .build(); - plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); - plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); - - Iterable results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForIntervals( - query, - QueryRunnerTestHelper.firstToThird.getIntervals() - ), - query - ); - - TestHelper.assertExpectedObjects(expectedResults, results, "interval"); - } - - } - - private void awaitStarted() throws InterruptedException - { - while (true) { - boolean notAllStarted = realtimeManager3 - .getFireChiefs("testing").values().stream() - .anyMatch( - fireChief -> { - final Plumber plumber = fireChief.getPlumber(); - return plumber == null || !((TestPlumber) plumber).isStartedJob(); - } - ); - if (!notAllStarted) { - break; - } - Thread.sleep(10); - } - } - - @Test(timeout = 60_000L) - public void testQueryWithSegmentSpec() throws InterruptedException - { - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), - - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) - ); - - realtimeManager3.start(); - - awaitStarted(); - - for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { - GroupByQuery query = GroupByQuery - .builder() - .setDataSource(QueryRunnerTestHelper.dataSource) - .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setDimensions(new DefaultDimensionSpec("quality", "alias")) - .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) - .setGranularity(QueryRunnerTestHelper.dayGran) - .build(); - plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); - plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); - - Iterable results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForSegments( - query, - ImmutableList.of( - new SegmentDescriptor( - Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), - "ver", - 0 - )) - ), - query - ); - TestHelper.assertExpectedObjects(expectedResults, results, "segmentSpec"); - - results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForSegments( - query, - ImmutableList.of( - new SegmentDescriptor( - Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), - "ver", - 1 - )) - ), - query - ); - TestHelper.assertExpectedObjects(expectedResults, results, "segmentSpec"); - } - - } - - @Test(timeout = 60_000L) - public void testQueryWithMultipleSegmentSpec() throws InterruptedException - { - - List expectedResults_both_partitions = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 2L, "idx", 260L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 2L, "idx", 236L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 4L, "idx", 4556L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 2L, "idx", 284L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 2L, "idx", 202L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 2L, "idx", 288L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 2L, "idx", 326L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 2L, "idx", 312L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 2L, "idx", 248L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 2L, "idx", 326L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 2L, "idx", 262L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 6L, "idx", 5126L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 2L, "idx", 254L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 6L, "idx", 5276L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 2L, "idx", 206L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 2L, "idx", 260L) - ); - - List expectedResults_single_partition_26_28 = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 1L, "idx", 130L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 1L, "idx", 118L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 2L, "idx", 2278L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 1L, "idx", 142L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 1L, "idx", 101L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 1L, "idx", 144L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 1L, "idx", 163L) - ); - - List expectedResults_single_partition_28_29 = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 1L, "idx", 156L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 1L, "idx", 124L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 1L, "idx", 163L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 1L, "idx", 131L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 3L, "idx", 2563L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 1L, "idx", 127L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 3L, "idx", 2638L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 1L, "idx", 103L), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L) - ); - - realtimeManager3.start(); - - awaitStarted(); - - final Interval interval_26_28 = Intervals.of("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z"); - final Interval interval_28_29 = Intervals.of("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z"); - final SegmentDescriptor descriptor_26_28_0 = new SegmentDescriptor(interval_26_28, "ver0", 0); - final SegmentDescriptor descriptor_28_29_0 = new SegmentDescriptor(interval_28_29, "ver1", 0); - final SegmentDescriptor descriptor_26_28_1 = new SegmentDescriptor(interval_26_28, "ver0", 1); - final SegmentDescriptor descriptor_28_29_1 = new SegmentDescriptor(interval_28_29, "ver1", 1); - - GroupByQuery query = GroupByQuery - .builder() - .setDataSource(QueryRunnerTestHelper.dataSource) - .setQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - ImmutableList.of( - descriptor_26_28_0, - descriptor_28_29_0, - descriptor_26_28_1, - descriptor_28_29_1 - ))) - .setDimensions(new DefaultDimensionSpec("quality", "alias")) - .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) - .setGranularity(QueryRunnerTestHelper.dayGran) - .build(); - - final Map runnerMap = ImmutableMap.of( - interval_26_28, - QueryRunnerTestHelper.makeQueryRunner( - factory, - "druid.sample.numeric.tsv.top", - null - ), - interval_28_29, - QueryRunnerTestHelper.makeQueryRunner( - factory, - "druid.sample.numeric.tsv.bottom", - null - ) - ); - plumber.setRunners(runnerMap); - plumber2.setRunners(runnerMap); - - Iterable results = GroupByQueryRunnerTestHelper.runQuery( - factory, - query.getQuerySegmentSpec().lookup(query, realtimeManager3), - query - ); - TestHelper.assertExpectedObjects(expectedResults_both_partitions, results, "multi-segmentSpec"); - - results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForSegments( - query, - ImmutableList.of( - descriptor_26_28_0) - ), - query - ); - TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, "multi-segmentSpec"); - - results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForSegments( - query, - ImmutableList.of( - descriptor_28_29_0) - ), - query - ); - TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, "multi-segmentSpec"); - - results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForSegments( - query, - ImmutableList.of( - descriptor_26_28_1) - ), - query - ); - TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, "multi-segmentSpec"); - - results = GroupByQueryRunnerTestHelper.runQuery( - factory, - realtimeManager3.getQueryRunnerForSegments( - query, - ImmutableList.of( - descriptor_28_29_1) - ), - query - ); - TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, "multi-segmentSpec"); - - } - - private static Pair initFactory() - { - final GroupByQueryConfig config = new GroupByQueryConfig(); - config.setMaxIntermediateRows(10000); - return GroupByQueryRunnerTest.makeQueryRunnerFactory(config); - } - - private static TestInputRowHolder makeRow(final long timestamp) - { - return new TestInputRowHolder(timestamp, null); - } - - private static TestInputRowHolder makeRow(final RuntimeException e) - { - return new TestInputRowHolder(0, e); - } - - private static class TestInputRowHolder - { - private long timestamp; - private RuntimeException exception; - - public TestInputRowHolder(long timestamp, RuntimeException exception) - { - this.timestamp = timestamp; - this.exception = exception; - } - - public InputRow getRow() - { - if (exception != null) { - throw exception; - } - - return new InputRow() - { - @Override - public List getDimensions() - { - return Collections.singletonList("testDim"); - } - - @Override - public long getTimestampFromEpoch() - { - return timestamp; - } - - @Override - public DateTime getTimestamp() - { - return DateTimes.utc(timestamp); - } - - @Override - public List getDimension(String dimension) - { - return new ArrayList<>(); - } - - @Override - public Number getMetric(String metric) - { - return 0; - } - - @Override - public Object getRaw(String dimension) - { - return null; - } - - @Override - public int compareTo(Row o) - { - return 0; - } - }; - } - } - - private static class TestFirehose implements Firehose - { - private final Iterator rows; - private boolean closed; - - private TestFirehose(Iterator rows) - { - this.rows = rows; - } - - @Override - public boolean hasMore() - { - return rows.hasNext(); - } - - @Nullable - @Override - public InputRow nextRow() - { - final TestInputRowHolder holder = rows.next(); - if (holder == null) { - return null; - } else { - return holder.getRow(); - } - } - - @Override - public Runnable commit() - { - return Runnables.getNoopRunnable(); - } - - public boolean isClosed() - { - return closed; - } - - @Override - public void close() - { - closed = true; - } - } - - private static class TestFirehoseV2 implements FirehoseV2 - { - private final Iterator rows; - private InputRow currRow; - private boolean stop; - private boolean closed; - - private TestFirehoseV2(Iterator rows) - { - this.rows = rows; - } - - private void nextMessage() - { - currRow = null; - while (currRow == null) { - final TestInputRowHolder holder = rows.next(); - currRow = holder == null ? null : holder.getRow(); - } - } - - @Override - public void close() - { - closed = true; - } - - public boolean isClosed() - { - return closed; - } - - @Override - public boolean advance() - { - stop = !rows.hasNext(); - if (stop) { - return false; - } - - nextMessage(); - return true; - } - - @Override - public InputRow currRow() - { - return currRow; - } - - @Override - public Committer makeCommitter() - { - return new Committer() - { - @Override - public Object getMetadata() - { - return null; - } - - @Override - public void run() - { - } - }; - } - - @Override - public void start() - { - nextMessage(); - } - } - - private static class SleepingFirehose implements Firehose - { - private boolean closed; - - @Override - public boolean hasMore() - { - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - return true; - } - - @Nullable - @Override - public InputRow nextRow() - { - return null; - } - - @Override - public Runnable commit() - { - return null; - } - - public boolean isClosed() - { - return closed; - } - - @Override - public void close() - { - closed = true; - } - } - - private static class TestPlumber implements Plumber - { - private final Sink sink; - - - private volatile boolean startedJob = false; - private volatile boolean finishedJob = false; - private volatile int persistCount = 0; - - private Map runners; - - private TestPlumber(Sink sink) - { - this.sink = sink; - } - - private boolean isStartedJob() - { - return startedJob; - } - - private boolean isFinishedJob() - { - return finishedJob; - } - - private int getPersistCount() - { - return persistCount; - } - - @Override - public Object startJob() - { - startedJob = true; - return null; - } - - @Override - public IncrementalIndexAddResult add(InputRow row, Supplier committerSupplier) - throws IndexSizeExceededException - { - if (row == null) { - return Plumber.THROWAWAY; - } - - Sink sink = getSink(row.getTimestampFromEpoch()); - - if (sink == null) { - return Plumber.THROWAWAY; - } - - return sink.add(row, false); - } - - public Sink getSink(long timestamp) - { - if (sink.getInterval().contains(timestamp)) { - return sink; - } - return null; - } - - @SuppressWarnings("unchecked") - @Override - public QueryRunner getQueryRunner(final Query query) - { - if (runners == null) { - throw new UnsupportedOperationException(); - } - - final BaseQuery baseQuery = (BaseQuery) query; - - if (baseQuery.getQuerySegmentSpec() instanceof MultipleIntervalSegmentSpec) { - return factory.getToolchest() - .mergeResults( - factory.mergeRunners( - Execs.directExecutor(), - Iterables.transform( - baseQuery.getIntervals(), - new Function>() - { - @Override - public QueryRunner apply(Interval input) - { - return runners.get(input); - } - } - ) - ) - ); - } - - Assert.assertEquals(1, query.getIntervals().size()); - - final SegmentDescriptor descriptor = - ((SpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptor(); - - return new SpecificSegmentQueryRunner( - runners.get(descriptor.getInterval()), - new SpecificSegmentSpec(descriptor) - ); - } - - @Override - public void persist(Committer committer) - { - persistCount++; - } - - @Override - public void finishJob() - { - finishedJob = true; - } - - public void setRunners(Map runners) - { - this.runners = runners; - } - } - -} diff --git a/server/src/test/java/org/apache/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java similarity index 97% rename from server/src/test/java/org/apache/druid/realtime/firehose/CombiningFirehoseFactoryTest.java rename to server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java index c81d309cd8a..c08111d8795 100644 --- a/server/src/test/java/org/apache/druid/realtime/firehose/CombiningFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.realtime.firehose; +package org.apache.druid.segment.realtime.firehose; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -26,7 +26,6 @@ import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.utils.Runnables; import org.joda.time.DateTime; import org.junit.Assert; diff --git a/services/src/main/java/org/apache/druid/cli/CliRealtime.java b/services/src/main/java/org/apache/druid/cli/CliRealtime.java deleted file mode 100644 index 3a5ee57b2de..00000000000 --- a/services/src/main/java/org/apache/druid/cli/CliRealtime.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.cli; - -import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; -import com.google.inject.Module; -import com.google.inject.name.Names; -import io.airlift.airline.Command; -import org.apache.druid.guice.DruidProcessingModule; -import org.apache.druid.guice.QueryRunnerFactoryModule; -import org.apache.druid.guice.QueryableModule; -import org.apache.druid.guice.RealtimeModule; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.lookup.LookupModule; -import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule; - -import java.util.List; -import java.util.Properties; - -/** - */ -@Command( - name = "realtime", - description = "Runs a realtime node, see https://druid.apache.org/docs/latest/Realtime.html for a description" -) -public class CliRealtime extends ServerRunnable -{ - private static final Logger log = new Logger(CliRealtime.class); - - @Inject - private Properties properties; - - public CliRealtime() - { - super(log); - } - - @Override - protected List getModules() - { - return ImmutableList.of( - new DruidProcessingModule(), - new QueryableModule(), - new QueryRunnerFactoryModule(), - new RealtimeModule(), - binder -> { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8284); - }, - new ChatHandlerServerModule(properties), - new LookupModule() - ); - } -} diff --git a/services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java b/services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java deleted file mode 100644 index a4029a7855d..00000000000 --- a/services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.cli; - -import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; -import com.google.inject.Module; -import com.google.inject.name.Names; -import io.airlift.airline.Command; -import org.apache.druid.client.DruidServer; -import org.apache.druid.client.InventoryView; -import org.apache.druid.client.ServerView; -import org.apache.druid.guice.DruidProcessingModule; -import org.apache.druid.guice.LazySingleton; -import org.apache.druid.guice.QueryRunnerFactoryModule; -import org.apache.druid.guice.QueryableModule; -import org.apache.druid.guice.RealtimeModule; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.lookup.LookupModule; -import org.apache.druid.segment.loading.DataSegmentPusher; -import org.apache.druid.segment.loading.NoopDataSegmentPusher; -import org.apache.druid.server.coordination.DataSegmentAnnouncer; -import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; -import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule; -import org.apache.druid.timeline.DataSegment; - -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - */ -@Command( - name = "realtime", - description = "Runs a standalone realtime node for examples, see https://druid.apache.org/docs/latest/Realtime.html for a description" -) -public class CliRealtimeExample extends ServerRunnable -{ - private static final Logger log = new Logger(CliRealtimeExample.class); - - @Inject - private Properties properties; - - public CliRealtimeExample() - { - super(log); - } - - @Override - protected List getModules() - { - return ImmutableList.of( - new DruidProcessingModule(), - new QueryableModule(), - new QueryRunnerFactoryModule(), - new RealtimeModule(), - binder -> { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8284); - - binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class).in(LazySingleton.class); - binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class).in(LazySingleton.class); - binder.bind(InventoryView.class).to(NoopInventoryView.class).in(LazySingleton.class); - binder.bind(ServerView.class).to(NoopServerView.class).in(LazySingleton.class); - }, - new ChatHandlerServerModule(properties), - new LookupModule() - ); - } - - private static class NoopServerView implements ServerView - { - @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) - { - // do nothing - } - - @Override - public void registerSegmentCallback(Executor exec, SegmentCallback callback) - { - // do nothing - } - } - - private static class NoopInventoryView implements InventoryView - { - @Override - public DruidServer getInventoryValue(String serverKey) - { - return null; - } - - @Override - public Collection getInventory() - { - return ImmutableList.of(); - } - - @Override - public boolean isStarted() - { - return true; - } - - @Override - public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) - { - return false; - } - } -} diff --git a/services/src/main/java/org/apache/druid/cli/Main.java b/services/src/main/java/org/apache/druid/cli/Main.java index 34f00a0fa7f..0cc44274a7b 100644 --- a/services/src/main/java/org/apache/druid/cli/Main.java +++ b/services/src/main/java/org/apache/druid/cli/Main.java @@ -59,7 +59,6 @@ public class Main CliCoordinator.class, CliHistorical.class, CliBroker.class, - CliRealtime.class, CliOverlord.class, CliMiddleManager.class, CliRouter.class @@ -69,11 +68,6 @@ public class Main .withDefaultCommand(Help.class) .withCommands(serverCommands); - builder.withGroup("example") - .withDescription("Run an example") - .withDefaultCommand(Help.class) - .withCommands(CliRealtimeExample.class); - List> toolCommands = Arrays.asList( DruidJsonValidator.class, PullDependencies.class, diff --git a/services/src/main/java/org/apache/druid/guice/RealtimeModule.java b/services/src/main/java/org/apache/druid/guice/RealtimeModule.java deleted file mode 100644 index 5285c5c5ec7..00000000000 --- a/services/src/main/java/org/apache/druid/guice/RealtimeModule.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.guice; - -import com.google.inject.Binder; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.TypeLiteral; -import com.google.inject.multibindings.MapBinder; -import org.apache.druid.cli.QueryJettyServerInitializer; -import org.apache.druid.client.cache.CacheConfig; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; -import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; -import org.apache.druid.metadata.MetadataSegmentPublisher; -import org.apache.druid.query.QuerySegmentWalker; -import org.apache.druid.segment.realtime.FireDepartment; -import org.apache.druid.segment.realtime.NoopSegmentPublisher; -import org.apache.druid.segment.realtime.RealtimeManager; -import org.apache.druid.segment.realtime.SegmentPublisher; -import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; -import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; -import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; -import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; -import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import org.apache.druid.server.QueryResource; -import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordination.ZkCoordinator; -import org.apache.druid.server.http.SegmentListerResource; -import org.apache.druid.server.initialization.jetty.JettyServerInitializer; -import org.apache.druid.server.metrics.QueryCountStatsProvider; -import org.eclipse.jetty.server.Server; - -import java.util.List; - -/** - */ -public class RealtimeModule implements Module -{ - - @Override - public void configure(Binder binder) - { - PolyBind.createChoiceWithDefault(binder, "druid.publish.type", Key.get(SegmentPublisher.class), "metadata"); - final MapBinder publisherBinder = PolyBind.optionBinder( - binder, - Key.get(SegmentPublisher.class) - ); - publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class).in(LazySingleton.class); - publisherBinder.addBinding("metadata").to(MetadataSegmentPublisher.class).in(LazySingleton.class); - - PolyBind.createChoice( - binder, - "druid.realtime.rowIngestionMeters.type", - Key.get(RowIngestionMetersFactory.class), - Key.get(DropwizardRowIngestionMetersFactory.class) - ); - final MapBinder rowIngestionMetersHandlerProviderBinder = - PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class)); - rowIngestionMetersHandlerProviderBinder - .addBinding("dropwizard") - .to(DropwizardRowIngestionMetersFactory.class) - .in(LazySingleton.class); - binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); - - PolyBind.createChoice( - binder, - "druid.realtime.chathandler.type", - Key.get(ChatHandlerProvider.class), - Key.get(ServiceAnnouncingChatHandlerProvider.class) - ); - final MapBinder handlerProviderBinder = - PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class)); - handlerProviderBinder - .addBinding("announce") - .to(ServiceAnnouncingChatHandlerProvider.class) - .in(LazySingleton.class); - handlerProviderBinder - .addBinding("noop") - .to(NoopChatHandlerProvider.class) - .in(LazySingleton.class); - - JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind( - new TypeLiteral>() - { - } - ) - .toProvider(FireDepartmentsProvider.class) - .in(LazySingleton.class); - - JsonConfigProvider.bind(binder, "druid.segment.handoff", CoordinatorBasedSegmentHandoffNotifierConfig.class); - binder.bind(SegmentHandoffNotifierFactory.class) - .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) - .in(LazySingleton.class); - binder.bind(CoordinatorClient.class).in(LazySingleton.class); - - JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); - binder.install(new CacheModule()); - - binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class); - binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.REALTIME)); - binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); - binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); - Jerseys.addResource(binder, QueryResource.class); - Jerseys.addResource(binder, SegmentListerResource.class); - LifecycleModule.register(binder, QueryResource.class); - LifecycleModule.register(binder, Server.class); - - binder.bind(SegmentManager.class).in(LazySingleton.class); - binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); - LifecycleModule.register(binder, ZkCoordinator.class); - } -} diff --git a/services/src/test/java/org/apache/druid/cli/MainTest.java b/services/src/test/java/org/apache/druid/cli/MainTest.java index 1a35b666487..3e960f69e44 100644 --- a/services/src/test/java/org/apache/druid/cli/MainTest.java +++ b/services/src/test/java/org/apache/druid/cli/MainTest.java @@ -50,8 +50,6 @@ public class MainTest //new Object[]{new CliInternalHadoopIndexer()}, new Object[]{new CliMiddleManager()}, - new Object[]{new CliRealtime()}, - new Object[]{new CliRealtimeExample()}, new Object[]{new CliRouter()} ); }