diff --git a/LICENSE.BINARY b/LICENSE.BINARY index 56016a74b17..ddd7492db44 100644 --- a/LICENSE.BINARY +++ b/LICENSE.BINARY @@ -654,40 +654,6 @@ BINARY/EXTENSIONS/druid-datasketches * com.yahoo.datasketches:memory -BINARY/EXTENSIONS/druid-examples - - This product bundles IRC API version 1.0-0014. - * com.ircclouds.irc:irc-api - - This product bundles MaxMind GeoIP2 API version 0.4.0. - * com.maxmind.geoip2:geoip2 - - This product bundles the following Apache Commons libraries: - * commons-beanutils 1.8.3 - * commons-validator 1.4.0 - - This product bundles Twitter4J version 3.0.3. - * org.twitter4j:twitter4j-async - * org.twitter4j:twitter4j-core - * org.twitter4j:twitter4j-stream - - -BINARY/EXTENSIONS/druid-kafka-eight - - This product bundles Apache Kafka version 0.8.2.1. - * org.apache.kafka:kafka_2.10 - * org.apache.kafka:kafka-clients - - This product bundles ZkClient version 0.3. - * com.101tec:zkclient - - This product bundles Yammer Metrics version 2.2.0. - * com.yammer.metrics:metrics-core - - This product bundles snappy-java version 1.1.1.6. - * org.xerial.snappy:snappy-java - - BINARY/EXTENSIONS/druid-kafka-indexing-service This product bundles Apache Kafka version 0.10.2.2. * org.apache.kafka:kafka-clients @@ -785,12 +751,6 @@ BINARY/HADOOP-CLIENT * org.slf4j:slf4j-log4j12 -BINARY/EXTENSIONS/druid-kafka-eight - This product bundles JOpt Simple version 3.2., copyright Paul R. Holser, Jr., - which is available under an MIT license. For details, see licenses/bin/jopt-simple.MIT. - * net.sf.jopt-simple:jopt-simple - - BINARY/WEB-CONSOLE The following dependency names are NPM package names (https://www.npmjs.com). @@ -1052,13 +1012,6 @@ BINARY/EXTENSIONS/druid-kerberos which is available under a BSD-3-Clause License. For details, see licenses/bin/jsch.BSD3. * com.jcraft:jsch - -BINARY/EXTENSIONS/druid-kafka-eight - This product bundles Scala Library version 2.10.4, copyright EPFL, Lightbend Inc., - which is available under a BSD-3-Clause License. For details, see licenses/bin/scala-lang.BSD3. - * org.scala-lang:scala-library - - BINARY/EXTENSIONS/druid-lookups-cached-single This product bundles StringTemplate version 3.2, copyright Terrence Parr, which is available under a BSD-3-Clause License. For details, see licenses/bin/antlr-stringtemplate.BSD3. diff --git a/NOTICE.BINARY b/NOTICE.BINARY index fba82ceaa0a..8b3e99f7383 100644 --- a/NOTICE.BINARY +++ b/NOTICE.BINARY @@ -1719,44 +1719,6 @@ http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html -############ BINARY/EXTENSIONS/druid-kafka-eight ############ - -================= metrics-core-2.2.0.jar ================= -Metrics -Copyright 2010-2012 Coda Hale and Yammer, Inc. - -This product includes software developed by Coda Hale and Yammer, Inc. - -This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released -with the following comments: - - Written by Doug Lea with assistance from members of JCP JSR-166 - Expert Group and released to the public domain, as explained at - http://creativecommons.org/publicdomain/zero/1.0/ - - - - -================= snappy-1.1.1.6.jar ================= -This product includes software developed by Google - Snappy: http://code.google.com/p/snappy/ (New BSD License) - -This product includes software developed by Apache - PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ - (Apache 2.0 license) - -This library containd statically linked libstdc++. This inclusion is allowed by -"GCC RUntime Library Exception" -http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html - -== Contributors == - * Tatu Saloranta - * Providing benchmark suite - * Alec Wysoker - * Performance and memory usage improvement - - - ############ BINARY/EXTENSIONS/druid-kafka-indexing-service ############ diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryV2.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryV2.java deleted file mode 100644 index 08259b4f69c..00000000000 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryV2.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.data.input; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.guice.annotations.ExtensionPoint; -import org.apache.druid.java.util.common.parsers.ParseException; - -import java.io.IOException; - -/** - * Initialization method that connects up the FirehoseV2. If this method returns successfully it should be safe to - * call start() on the returned FirehoseV2 (which might subsequently block). - * - * In contrast to V1 version, FirehoseFactoryV2 is able to pass an additional json-serialized object to FirehoseV2, - * which contains last commit metadata - * - *

- * If this method returns null, then any attempt to call start(), advance(), currRow(), makeCommitter() and close() on the return - * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on - * invalid configuration is preferred over returning null. - */ -@ExtensionPoint -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface FirehoseFactoryV2 -{ - /** - * This method is declared to throw {@link IOException}, although it's not thrown in the implementations in Druid - * code, for compatibility with third-party extensions. - */ - @SuppressWarnings("RedundantThrows") - FirehoseV2 connect(T parser, Object lastCommit) throws IOException, ParseException; -} diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java b/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java deleted file mode 100644 index c6aa33f95d2..00000000000 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.data.input; - -import org.apache.druid.guice.annotations.ExtensionPoint; - -import java.io.Closeable; - -/** - * This is an interface that holds onto the stream of incoming data. Realtime data ingestion is built around this - * abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement - * one of these and register it with the Main. - * - * In contrast to Firehose v1 version, FirehoseV2 will always operate in a "peek, then advance" manner. - * And the intended usage patttern is - * 1. Call start() - * 2. Read currRow() - * 3. Call advance() - * 4. If index should be committed: commit() - * 5. GOTO 2 - * - * Note that commit() is being called *after* advance. - * - * This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends - * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this - * gets passed around as an Iterator. - * - * The implementation of this interface only needs to be minimally thread-safe. The methods {@link #start()}, {@link - * #advance()}, {@link #currRow()} and {@link #makeCommitter()} are all called from the same thread. {@link - * #makeCommitter()}, however, returns a callback which will be called on another thread, so the operations inside of - * that callback must be thread-safe. - */ -@ExtensionPoint -public interface FirehoseV2 extends Closeable -{ - /** - * For initial start - */ - void start(); - - /** - * Advance the firehose to the next offset. Implementations of this interface should make sure that - * if advance() is called and throws out an exception, the next call to currRow() should return a - * null value. - * - * @return true if and when there is another row available, false if the stream has dried up - */ - boolean advance(); - - /** - * @return The current row - */ - InputRow currRow(); - - /** - * Returns a Committer that will "commit" everything read up to the point at which makeCommitter() is called. - * - * This method is called when the main processing loop starts to persist its current batch of things to process. - * The returned committer will be run when the current batch has been successfully persisted - * and the metadata the committer carries can also be persisted along with segment data. There is usually - * some time lag between when this method is called and when the runnable is run. The Runnable is also run on - * a separate thread so its operation should be thread-safe. - * - * Note that "correct" usage of this interface will always call advance() before commit() if the current row - * is considered in the commit. - * - * The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has - * been committed on the writer side of this interface protocol. - * - * A simple implementation of this interface might do nothing when run() is called, - * and save proper commit information in metadata - */ - Committer makeCommitter(); -} diff --git a/distribution/pom.xml b/distribution/pom.xml index d74d15547ee..e9c23d6f5de 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -168,8 +168,6 @@ -c org.apache.druid.extensions:druid-histogram -c - org.apache.druid.extensions:druid-kafka-eight - -c org.apache.druid.extensions:druid-kafka-extraction-namespace -c org.apache.druid.extensions:druid-kafka-indexing-service @@ -200,8 +198,6 @@ -c org.apache.druid.extensions:druid-stats -c - org.apache.druid.extensions:druid-examples - -c org.apache.druid.extensions:simple-client-sslcontext -c org.apache.druid.extensions:druid-basic-security @@ -318,16 +314,12 @@ -c org.apache.druid.extensions.contrib:druid-distinctcount -c - org.apache.druid.extensions.contrib:druid-rocketmq - -c org.apache.druid.extensions.contrib:graphite-emitter -c org.apache.druid.extensions.contrib:druid-influx-extensions -c org.apache.druid.extensions.contrib:druid-influxdb-emitter -c - org.apache.druid.extensions.contrib:druid-kafka-eight-simple-consumer - -c org.apache.druid.extensions.contrib:kafka-emitter -c org.apache.druid.extensions.contrib:materialized-view-maintenance @@ -336,8 +328,6 @@ -c org.apache.druid.extensions.contrib:druid-opentsdb-emitter -c - org.apache.druid.extensions.contrib:druid-rabbitmq - -c org.apache.druid.extensions.contrib:druid-redis-cache -c org.apache.druid.extensions.contrib:sqlserver-metadata-storage diff --git a/docs/_redirects.json b/docs/_redirects.json index 508aedf2d5f..88f94ce702d 100644 --- a/docs/_redirects.json +++ b/docs/_redirects.json @@ -166,9 +166,13 @@ {"source": "development/community-extensions/rabbitmq.html", "target": "../extensions-contrib/rabbitmq.html"}, {"source": "development/extensions-core/namespaced-lookup.html", "target": "lookups-cached-global.html"}, {"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"}, - {"source": "development/extensions-contrib/orc.html", "target": "../extensions-core/orc.html"} + {"source": "development/extensions-contrib/orc.html", "target": "../extensions-core/orc.html"}, {"source": "operations/performance-faq.html", "target": "../operations/basic-cluster-tuning.html"}, {"source": "configuration/realtime.md", "target": "../ingestion/standalone-realtime.html"}, {"source": "design/realtime.md", "target": "../ingestion/standalone-realtime.html"}, - {"source": "ingestion/stream-pull.md", "target": "../ingestion/standalone-realtime.html"} + {"source": "ingestion/stream-pull.md", "target": "../ingestion/standalone-realtime.html"}, + {"source": "development/extensions-core/kafka-eight-firehose.md", "target": "../../ingestion/standalone-realtime.html"}, + {"source": "development/extensions-contrib/kafka-simple.md", "target": "../../ingestion/standalone-realtime.html"}, + {"source": "development/extensions-contrib/rabbitmq.md", "target": "../../ingestion/standalone-realtime.html"}, + {"source": "development/extensions-contrib/rocketmq.md", "target": "../../ingestion/standalone-realtime.html"}, ] diff --git a/docs/content/development/extensions-contrib/kafka-simple.md b/docs/content/development/extensions-contrib/kafka-simple.md deleted file mode 100644 index 3211efe90b3..00000000000 --- a/docs/content/development/extensions-contrib/kafka-simple.md +++ /dev/null @@ -1,56 +0,0 @@ ---- -layout: doc_page -title: "Kafka Simple Consumer" ---- - - - -# Kafka Simple Consumer - -To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-kafka-eight-simpleConsumer` extension. - -## Firehose - -This is an experimental firehose to ingest data from Apache Kafka using the Kafka simple consumer api. Currently, this firehose would only work inside standalone realtime processes. -The configuration for KafkaSimpleConsumerFirehose is similar to the Kafka Eight Firehose , except `firehose` should be replaced with `firehoseV2` like this: - -```json -"firehoseV2": { - "type" : "kafka-0.8-v2", - "brokerList" : ["localhost:4443"], - "queueBufferLength":10001, - "resetOffsetToEarliest":"true", - "partitionIdList" : ["0"], - "clientId" : "localclient", - "feed": "wikipedia" -} -``` - -|property|description|required?| -|--------|-----------|---------| -|type|kafka-0.8-v2|yes| -|brokerList|list of the kafka brokers|yes| -|queueBufferLength|the buffer length for kafka message queue|no default(20000)| -|resetOffsetToEarliest|in case of kafkaOffsetOutOfRange error happens, consumer should starts from the earliest or latest message available|true| -|partitionIdList|list of kafka partition ids|yes| -|clientId|the clientId for kafka SimpleConsumer|yes| -|feed|kafka topic|yes| - -For using this firehose at scale and possibly in production, it is recommended to set replication factor to at least three, which means at least three Kafka brokers in the `brokerList`. For a 1*10^4 events per second kafka topic, keeping one partition can work properly, but more partitions could be added if higher throughput is required. diff --git a/docs/content/development/extensions-contrib/rabbitmq.md b/docs/content/development/extensions-contrib/rabbitmq.md deleted file mode 100644 index e9eefc556bc..00000000000 --- a/docs/content/development/extensions-contrib/rabbitmq.md +++ /dev/null @@ -1,81 +0,0 @@ ---- -layout: doc_page -title: "RabbitMQ" ---- - - - -# RabbitMQ - -To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-rabbitmq` extension. - -## Firehose - -#### RabbitMQFirehose - -This firehose ingests events from a define rabbit-mq queue. - -**Note:** Add **amqp-client-3.2.1.jar** to lib directory of druid to use this firehose. - -A sample spec for rabbitmq firehose: - -```json -"firehose" : { - "type" : "rabbitmq", - "connection" : { - "host": "localhost", - "port": "5672", - "username": "test-dude", - "password": "test-word", - "virtualHost": "test-vhost", - "uri": "amqp://mqserver:1234/vhost" - }, - "config" : { - "exchange": "test-exchange", - "queue" : "druidtest", - "routingKey": "#", - "durable": "true", - "exclusive": "false", - "autoDelete": "false", - "maxRetries": "10", - "retryIntervalSeconds": "1", - "maxDurationSeconds": "300" - } -} -``` - -|property|description|default|required?| -|--------|-----------|-------|---------| -|type|This should be "rabbitmq"|N/A|yes| -|host|The hostname of the RabbitMQ broker to connect to|localhost|no| -|port|The port number to connect to on the RabbitMQ broker|5672|no| -|username|The username to use to connect to RabbitMQ|guest|no| -|password|The password to use to connect to RabbitMQ|guest|no| -|virtualHost|The virtual host to connect to|/|no| -|uri|The URI string to use to connect to RabbitMQ| |no| -|exchange|The exchange to connect to| |yes| -|queue|The queue to connect to or create| |yes| -|routingKey|The routing key to use to bind the queue to the exchange| |yes| -|durable|Whether the queue should be durable|false|no| -|exclusive|Whether the queue should be exclusive|false|no| -|autoDelete|Whether the queue should auto-delete on disconnect|false|no| -|maxRetries|The max number of reconnection retry attempts| |yes| -|retryIntervalSeconds|The reconnection interval| |yes| -|maxDurationSeconds|The max duration of trying to reconnect| |yes| diff --git a/docs/content/development/extensions-contrib/rocketmq.md b/docs/content/development/extensions-contrib/rocketmq.md deleted file mode 100644 index 4dd0eeab0e2..00000000000 --- a/docs/content/development/extensions-contrib/rocketmq.md +++ /dev/null @@ -1,29 +0,0 @@ ---- -layout: doc_page -title: "RocketMQ" ---- - - - -# RocketMQ - -To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-rocketmq` extension. - -Original author: [https://github.com/lizhanhui](https://github.com/lizhanhui). diff --git a/docs/content/development/extensions-core/kafka-eight-firehose.md b/docs/content/development/extensions-core/kafka-eight-firehose.md deleted file mode 100644 index 740e5fa65f8..00000000000 --- a/docs/content/development/extensions-core/kafka-eight-firehose.md +++ /dev/null @@ -1,54 +0,0 @@ ---- -layout: doc_page -title: "Apache Kafka Eight Firehose" ---- - - - -# Kafka Eight Firehose - -To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-kafka-eight` as an extension. - -This firehose acts as a Kafka 0.8.x consumer and ingests data from Kafka. - -Sample spec: - -```json -"firehose": { - "type": "kafka-0.8", - "consumerProps": { - "zookeeper.connect": "localhost:2181", - "zookeeper.connection.timeout.ms" : "15000", - "zookeeper.session.timeout.ms" : "15000", - "zookeeper.sync.time.ms" : "5000", - "group.id": "druid-example", - "fetch.message.max.bytes" : "1048586", - "auto.offset.reset": "largest", - "auto.commit.enable": "false" - }, - "feed": "wikipedia" -} -``` - -|property|description|required?| -|--------|-----------|---------| -|type|This should be "kafka-0.8"|yes| -|consumerProps|The full list of consumer configs can be [here](https://kafka.apache.org/08/configuration.html).|yes| -|feed|Kafka maintains feeds of messages in categories called topics. This is the topic name.|yes| diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 41a7aa1766d..7b51b7ab056 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -48,7 +48,6 @@ Core extensions are maintained by Druid committers. |druid-datasketches|Support for approximate counts and set operations with [DataSketches](https://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)| |druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)| |druid-histogram|Approximate histograms and quantiles aggregator. Deprecated, please use the [DataSketches quantiles aggregator](../development/extensions-core/datasketches-quantiles.html) from the `druid-datasketches` extension instead.|[link](../development/extensions-core/approximate-histograms.html)| -|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes(deprecated).|[link](../development/extensions-core/kafka-eight-firehose.html)| |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-kafka-indexing-service|Supervised exactly-once Kafka ingestion for the indexing service.|[link](../development/extensions-core/kafka-ingestion.html)| |druid-kinesis-indexing-service|Supervised exactly-once Kinesis ingestion for the indexing service.|[link](../development/extensions-core/kinesis-ingestion.html)| @@ -83,10 +82,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati |druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)| |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)| |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)| -|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)| -|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)| -|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| |druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)| |druid-google-extensions|Google Cloud Storage deep storage.|[link](../development/extensions-contrib/google.html)| |sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)| diff --git a/docs/content/ingestion/standalone-realtime.md b/docs/content/ingestion/standalone-realtime.md index 81ce89d6f09..27065c5e500 100644 --- a/docs/content/ingestion/standalone-realtime.md +++ b/docs/content/ingestion/standalone-realtime.md @@ -39,5 +39,8 @@ removed completely in Druid 0.16.0. Operationally, realtime nodes were difficult each node required an unique configuration. The design of the stream pull ingestion system for realtime nodes also suffered from limitations which made it not possible to achieve exactly once ingestion. +The extensions `druid-kafka-eight`, `druid-kafka-eight-simpleConsumer`, `druid-rabbitmq`, and `druid-rocketmq` were also +removed at this time, since they were built to operate on the realtime nodes. + Please consider using the [Kafka Indexing Service](../development/extensions-core/kafka-ingestion.html) or [Kinesis Indexing Service](../development/extensions-core/kinesis-ingestion.md) for stream pull ingestion instead. diff --git a/docs/content/operations/pull-deps.md b/docs/content/operations/pull-deps.md index 2af9a7d93d6..63cd5504418 100644 --- a/docs/content/operations/pull-deps.md +++ b/docs/content/operations/pull-deps.md @@ -92,10 +92,10 @@ To run `pull-deps`, you should Example: -Suppose you want to download ```druid-rabbitmq```, ```mysql-metadata-storage``` and ```hadoop-client```(both 2.3.0 and 2.4.0) with a specific version, you can run `pull-deps` command with `-c org.apache.druid.extensions:druid-examples:#{DRUIDVERSION}`, `-c org.apache.druid.extensions:mysql-metadata-storage:#{DRUIDVERSION}`, `-h org.apache.hadoop:hadoop-client:2.3.0` and `-h org.apache.hadoop:hadoop-client:2.4.0`, an example command would be: +Suppose you want to download ```mysql-metadata-storage``` and ```hadoop-client```(both 2.3.0 and 2.4.0) with a specific version, you can run `pull-deps` command with `-c org.apache.druid.extensions:mysql-metadata-storage:#{DRUIDVERSION}`, `-h org.apache.hadoop:hadoop-client:2.3.0` and `-h org.apache.hadoop:hadoop-client:2.4.0`, an example command would be: ``` -java -classpath "/my/druid/lib/*" org.apache.druid.cli.Main tools pull-deps --clean -c org.apache.druid.extensions:mysql-metadata-storage:#{DRUIDVERSION} -c org.apache.druid.extensions.contrib:druid-rabbitmq:#{DRUIDVERSION} -h org.apache.hadoop:hadoop-client:2.3.0 -h org.apache.hadoop:hadoop-client:2.4.0 +java -classpath "/my/druid/lib/*" org.apache.druid.cli.Main tools pull-deps --clean -c org.apache.druid.extensions:mysql-metadata-storage:#{DRUIDVERSION} -h org.apache.hadoop:hadoop-client:2.3.0 -h org.apache.hadoop:hadoop-client:2.4.0 ``` Because `--clean` is supplied, this command will first remove the directories specified at `druid.extensions.directory` and `druid.extensions.hadoopDependenciesDir`, then recreate them and start downloading the extensions there. After finishing downloading, if you go to the extension directories you specified, you will see @@ -103,15 +103,6 @@ Because `--clean` is supplied, this command will first remove the directories sp ``` tree extensions extensions -├── druid-examples -│   ├── commons-beanutils-1.8.3.jar -│   ├── commons-digester-1.8.jar -│   ├── commons-logging-1.1.1.jar -│   ├── commons-validator-1.4.0.jar -│   ├── druid-examples-#{DRUIDVERSION}.jar -│   ├── twitter4j-async-3.0.3.jar -│   ├── twitter4j-core-3.0.3.jar -│   └── twitter4j-stream-3.0.3.jar └── mysql-metadata-storage └── mysql-metadata-storage-#{DRUIDVERSION}.jar ``` @@ -138,10 +129,10 @@ hadoop-dependencies/ ..... lots of jars ``` -Note that if you specify `--defaultVersion`, you don't have to put version information in the coordinate. For example, if you want both `druid-rabbitmq` and `mysql-metadata-storage` to use version `#{DRUIDVERSION}`, you can change the command above to +Note that if you specify `--defaultVersion`, you don't have to put version information in the coordinate. For example, if you want `mysql-metadata-storage` to use version `#{DRUIDVERSION}`, you can change the command above to ``` -java -classpath "/my/druid/lib/*" org.apache.druid.cli.Main tools pull-deps --defaultVersion #{DRUIDVERSION} --clean -c org.apache.druid.extensions:mysql-metadata-storage -c org.apache.druid.extensions.contrib:druid-rabbitmq -h org.apache.hadoop:hadoop-client:2.3.0 -h org.apache.hadoop:hadoop-client:2.4.0 +java -classpath "/my/druid/lib/*" org.apache.druid.cli.Main tools pull-deps --defaultVersion #{DRUIDVERSION} --clean -c org.apache.druid.extensions:mysql-metadata-storage -h org.apache.hadoop:hadoop-client:2.3.0 -h org.apache.hadoop:hadoop-client:2.4.0 ```

diff --git a/examples/pom.xml b/examples/pom.xml deleted file mode 100644 index 37aded9b8a0..00000000000 --- a/examples/pom.xml +++ /dev/null @@ -1,134 +0,0 @@ - - - - - 4.0.0 - - org.apache.druid.extensions - druid-examples - druid-examples - druid-examples - - - org.apache.druid - druid - 0.16.0-incubating-SNAPSHOT - - - - - org.apache.druid - druid-server - ${project.parent.version} - provided - - - org.apache.druid - druid-core - ${project.parent.version} - provided - - - - org.twitter4j - twitter4j-core - 3.0.3 - - - org.twitter4j - twitter4j-async - 3.0.3 - - - org.twitter4j - twitter4j-stream - 3.0.3 - - - commons-validator - commons-validator - 1.5.1 - - - com.ircclouds.irc - irc-api - - - com.maxmind.geoip2 - geoip2 - - - com.google.guava - guava - - - - - - - junit - junit - test - - - - - - - org.apache.maven.plugins - maven-shade-plugin - - - package - - shade - - - - ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - diff --git a/examples/src/main/java/org/apache/druid/examples/ExamplesDruidModule.java b/examples/src/main/java/org/apache/druid/examples/ExamplesDruidModule.java deleted file mode 100644 index 8bf13d0cf9c..00000000000 --- a/examples/src/main/java/org/apache/druid/examples/ExamplesDruidModule.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.inject.Binder; -import org.apache.druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import org.apache.druid.examples.wikipedia.IrcFirehoseFactory; -import org.apache.druid.examples.wikipedia.IrcInputRowParser; -import org.apache.druid.initialization.DruidModule; - -import java.util.Collections; -import java.util.List; - -/** - */ -public class ExamplesDruidModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return Collections.singletonList( - new SimpleModule("ExamplesModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(IrcInputRowParser.class, "irc") - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/examples/src/main/java/org/apache/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/org/apache/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java deleted file mode 100644 index 300db0d8313..00000000000 --- a/examples/src/main/java/org/apache/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.twitter; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.MapBasedInputRow; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import twitter4j.ConnectionLifeCycleListener; -import twitter4j.GeoLocation; -import twitter4j.HashtagEntity; -import twitter4j.StallWarning; -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.StatusListener; -import twitter4j.TwitterStream; -import twitter4j.TwitterStreamFactory; -import twitter4j.User; - -import javax.annotation.Nullable; -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Twitter "spritzer" Firehose Factory named "twitzer". - * Builds a Firehose that emits a stream of - * ?? - * with timestamps along with ??. - * The generated tuples have the form (timestamp, ????) - * where the timestamp is from the twitter event. - *

- * Example spec file: - *

- * Example query using POST to /druid/v2/?w (where w is an arbitrary parameter and the date and time - * is UTC): - *

- * Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; - * twitter4j 3.0 (not yet released) will support the v1.1 api. - * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json - * See: http://jira.twitter4j.org/browse/TFJ-186 - *

- * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() - * can have number format exceptions), so it might be necessary to extract raw json and process it - * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); - * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. - */ -@JsonTypeName("twitzer") -public class TwitterSpritzerFirehoseFactory implements FirehoseFactory -{ - private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); - private static final Pattern sourcePattern = Pattern.compile("]*>(.*?)", Pattern.CASE_INSENSITIVE); - private static final int DEFAULT_QUEUE_SIZE = 2000; - - /** - * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent - * infinite space consumption or to prevent getting throttled at an inconvenient time - * or to see what happens when a Firehose stops delivering - * values, or to have hasMore() return false. The Twitter Spritzer can deliver about - * 1000 events per minute. - */ - private final int maxEventCount; - /** - * maximum number of minutes to fetch Twitter events. Use this to prevent getting - * throttled at an inconvenient time. If zero or less, no time limit for run. - */ - private final int maxRunMinutes; - - @JsonCreator - public TwitterSpritzerFirehoseFactory( - @JsonProperty("maxEventCount") Integer maxEventCount, - @JsonProperty("maxRunMinutes") Integer maxRunMinutes - ) - { - this.maxEventCount = maxEventCount; - this.maxRunMinutes = maxRunMinutes; - log.info("maxEventCount=" + ((maxEventCount <= 0) ? "no limit" : maxEventCount)); - log.info("maxRunMinutes=" + ((maxRunMinutes <= 0) ? "no limit" : maxRunMinutes)); - } - - @Override - public Firehose connect(InputRowParser parser, File temporaryDirectory) - { - final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() - { - @Override - public void onConnect() - { - log.info("Connected_to_Twitter"); - } - - @Override - public void onDisconnect() - { - log.info("Disconnect_from_Twitter"); - } - - /** - * called before thread gets cleaned up - */ - @Override - public void onCleanUp() - { - log.info("Cleanup_twitter_stream"); - } - }; // ConnectionLifeCycleListener - - final TwitterStream twitterStream; - final StatusListener statusListener; - /** This queue is used to move twitter events from the twitter4j thread to the druid ingest thread. */ - final BlockingQueue queue = new ArrayBlockingQueue(DEFAULT_QUEUE_SIZE); - final long startMsec = System.currentTimeMillis(); - - // - // set up Twitter Spritzer - // - twitterStream = new TwitterStreamFactory().getInstance(); - twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener); - statusListener = new StatusListener() - { // This is what really gets called to deliver stuff from twitter4j - @Override - public void onStatus(Status status) - { - // time to stop? - if (Thread.currentThread().isInterrupted()) { - throw new RuntimeException("Interrupted, time to stop"); - } - try { - boolean success = queue.offer(status, 15L, TimeUnit.SECONDS); - if (!success) { - log.warn("queue too slow!"); - } - } - catch (InterruptedException e) { - throw new RuntimeException("InterruptedException", e); - } - } - - @Override - public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) - { - //log.info("Got a status deletion notice id:" + statusDeletionNotice.getStatusId()); - } - - @Override - public void onTrackLimitationNotice(int numberOfLimitedStatuses) - { - // This notice will be sent each time a limited stream becomes unlimited. - // If this number is high and or rapidly increasing, it is an indication that your predicate is too broad, and you should consider a predicate with higher selectivity. - log.warn("Got track limitation notice:" + numberOfLimitedStatuses); - } - - @Override - public void onScrubGeo(long userId, long upToStatusId) - { - //log.info("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId); - } - - @Override - public void onException(Exception ex) - { - log.error(ex, "Got exception"); - } - - @Override - public void onStallWarning(StallWarning warning) - { - log.warn("Got stall warning: %s", warning); - } - }; - - twitterStream.addListener(statusListener); - twitterStream.sample(); // creates a generic StatusStream - log.info("returned from sample()"); - - return new Firehose() - { - - private final Runnable doNothingRunnable = new Runnable() - { - @Override - public void run() - { - } - }; - - private long rowCount = 0L; - private boolean waitIfmax = (getMaxEventCount() < 0L); - private final Map theMap = new TreeMap<>(); - // DIY json parsing // private final ObjectMapper omapper = new ObjectMapper(); - - private boolean maxTimeReached() - { - if (getMaxRunMinutes() <= 0) { - return false; - } else { - return (System.currentTimeMillis() - startMsec) / 60000L >= getMaxRunMinutes(); - } - } - - private boolean maxCountReached() - { - return getMaxEventCount() >= 0 && rowCount >= getMaxEventCount(); - } - - @Override - public boolean hasMore() - { - if (maxCountReached() || maxTimeReached()) { - return waitIfmax; - } else { - return true; - } - } - - @Nullable - @Override - public InputRow nextRow() - { - // Interrupted to stop? - if (Thread.currentThread().isInterrupted()) { - throw new RuntimeException("Interrupted, time to stop"); - } - - // all done? - if (maxCountReached() || maxTimeReached()) { - if (waitIfmax) { - // sleep a long time instead of terminating - try { - log.info("reached limit, sleeping a long time..."); - Thread.sleep(2000000000L); - } - catch (InterruptedException e) { - throw new RuntimeException("InterruptedException", e); - } - } else { - // allow this event through, and the next hasMore() call will be false - } - } - if (++rowCount % 1000 == 0) { - log.info("nextRow() has returned %,d InputRows", rowCount); - } - - Status status; - try { - status = queue.take(); - } - catch (InterruptedException e) { - throw new RuntimeException("InterruptedException", e); - } - - theMap.clear(); - - HashtagEntity[] hts = status.getHashtagEntities(); - String text = status.getText(); - theMap.put("text", (null == text) ? "" : text); - theMap.put( - "htags", (hts.length > 0) ? Lists.transform( - Arrays.asList(hts), new Function() - { - @Nullable - @Override - public String apply(HashtagEntity input) - { - return input.getText(); - } - } - ) : ImmutableList.of() - ); - - long[] lcontrobutors = status.getContributors(); - List contributors = new ArrayList<>(); - for (long contrib : lcontrobutors) { - contributors.add(StringUtils.format("%d", contrib)); - } - theMap.put("contributors", contributors); - - GeoLocation geoLocation = status.getGeoLocation(); - if (null != geoLocation) { - double lat = status.getGeoLocation().getLatitude(); - double lon = status.getGeoLocation().getLongitude(); - theMap.put("lat", lat); - theMap.put("lon", lon); - } else { - theMap.put("lat", null); - theMap.put("lon", null); - } - - if (status.getSource() != null) { - Matcher m = sourcePattern.matcher(status.getSource()); - theMap.put("source", m.find() ? m.group(1) : status.getSource()); - } - - theMap.put("retweet", status.isRetweet()); - - if (status.isRetweet()) { - Status original = status.getRetweetedStatus(); - theMap.put("retweet_count", original.getRetweetCount()); - - User originator = original.getUser(); - theMap.put("originator_screen_name", originator != null ? originator.getScreenName() : ""); - theMap.put("originator_follower_count", originator != null ? originator.getFollowersCount() : ""); - theMap.put("originator_friends_count", originator != null ? originator.getFriendsCount() : ""); - theMap.put("originator_verified", originator != null ? originator.isVerified() : ""); - } - - User user = status.getUser(); - final boolean hasUser = (null != user); - theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0); - theMap.put("friends_count", hasUser ? user.getFriendsCount() : 0); - theMap.put("lang", hasUser ? user.getLang() : ""); - theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available? - theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0); - theMap.put("user_id", hasUser ? StringUtils.format("%d", user.getId()) : ""); - theMap.put("screen_name", hasUser ? user.getScreenName() : ""); - theMap.put("location", hasUser ? user.getLocation() : ""); - theMap.put("verified", hasUser ? user.isVerified() : ""); - - theMap.put("ts", status.getCreatedAt().getTime()); - - List dimensions = Lists.newArrayList(theMap.keySet()); - - return new MapBasedInputRow(status.getCreatedAt().getTime(), dimensions, theMap); - } - - @Override - public Runnable commit() - { - // ephemera in, ephemera out. - return doNothingRunnable; // reuse the same object each time - } - - @Override - public void close() - { - log.info("CLOSE twitterstream"); - twitterStream.shutdown(); // invokes twitterStream.cleanUp() - } - }; - } - - @JsonProperty - public int getMaxEventCount() - { - return maxEventCount; - } - - @JsonProperty - public int getMaxRunMinutes() - { - return maxRunMinutes; - } -} diff --git a/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcDecoder.java b/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcDecoder.java deleted file mode 100644 index e4adb1c1f42..00000000000 --- a/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcDecoder.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.wikipedia; - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.data.input.InputRow; -import org.joda.time.DateTime; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -@JsonSubTypes({ - @JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class) -}) -public interface IrcDecoder -{ - InputRow decodeMessage(DateTime timestamp, String channel, String msg); -} diff --git a/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcFirehoseFactory.java b/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcFirehoseFactory.java deleted file mode 100644 index ab3f8069a46..00000000000 --- a/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcFirehoseFactory.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.wikipedia; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Lists; -import com.ircclouds.irc.api.Callback; -import com.ircclouds.irc.api.IRCApi; -import com.ircclouds.irc.api.IRCApiImpl; -import com.ircclouds.irc.api.IServerParameters; -import com.ircclouds.irc.api.domain.IRCServer; -import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; -import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter; -import com.ircclouds.irc.api.state.IIRCState; -import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.logger.Logger; -import org.joda.time.DateTime; - -import javax.annotation.Nullable; -import java.io.File; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - *

Example code:

- *
{@code
- * 

- * IrcFirehoseFactory factory = new IrcFirehoseFactory( - * "wiki123", - * "irc.wikimedia.org", - * Lists.newArrayList( - * "#en.wikipedia", - * "#fr.wikipedia", - * "#de.wikipedia", - * "#ja.wikipedia" - * ) - * ); - * }

- */ -public class IrcFirehoseFactory implements FirehoseFactory>> -{ - private static final Logger log = new Logger(IrcFirehoseFactory.class); - - private final String nick; - private final String host; - private final List channels; - private volatile boolean closed = false; - - @JsonCreator - public IrcFirehoseFactory( - @JsonProperty("nick") String nick, - @JsonProperty("host") String host, - @JsonProperty("channels") List channels - ) - { - this.nick = nick; - this.host = host; - this.channels = channels; - } - - @JsonProperty - public String getNick() - { - return nick; - } - - @JsonProperty - public String getHost() - { - return host; - } - - @JsonProperty - public List getChannels() - { - return channels; - } - - @Override - public Firehose connect( - final InputRowParser> firehoseParser, - final File temporaryDirectory - ) - { - final IRCApi irc = new IRCApiImpl(false); - final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); - - irc.addListener( - new VariousMessageListenerAdapter() - { - @Override - public void onChannelMessage(ChannelPrivMsg aMsg) - { - try { - queue.put(Pair.of(DateTimes.nowUtc(), aMsg)); - } - catch (InterruptedException e) { - throw new RuntimeException("interrupted adding message to queue", e); - } - } - } - ); - - log.info("connecting to irc server [%s]", host); - irc.connect( - new IServerParameters() - { - @Override - public String getNickname() - { - return nick; - } - - @Override - public List getAlternativeNicknames() - { - return Lists.newArrayList(nick + UUID.randomUUID(), nick + UUID.randomUUID(), nick + UUID.randomUUID()); - } - - @Override - public String getIdent() - { - return "druid"; - } - - @Override - public String getRealname() - { - return nick; - } - - @Override - public IRCServer getServer() - { - return new IRCServer(host, false); - } - }, - new Callback() - { - @Override - public void onSuccess(IIRCState aObject) - { - log.info("irc connection to server [%s] established", host); - for (String chan : channels) { - log.info("Joining channel %s", chan); - irc.joinChannel(chan); - } - } - - @Override - public void onFailure(Exception e) - { - log.error(e, "Unable to connect to irc server [%s]", host); - throw new RuntimeException("Unable to connect to server", e); - } - } - ); - - closed = false; - - return new Firehose() - { - InputRow nextRow = null; - Iterator nextIterator = Collections.emptyIterator(); - - @Override - public boolean hasMore() - { - try { - while (true) { - if (closed) { - return false; - } - - if (nextIterator.hasNext()) { - nextRow = nextIterator.next(); - if (nextRow != null) { - return true; - } - } else { - Pair nextMsg = queue.poll(100, TimeUnit.MILLISECONDS); - if (nextMsg == null) { - continue; - } - try { - nextIterator = firehoseParser.parseBatch(nextMsg).iterator(); - } - catch (IllegalArgumentException iae) { - log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName()); - } - } - } - } - catch (InterruptedException e) { - Thread.interrupted(); - throw new RuntimeException("interrupted retrieving elements from queue", e); - } - } - - @Nullable - @Override - public InputRow nextRow() - { - return nextRow; - } - - @Override - public Runnable commit() - { - return () -> { - // nothing to see here - }; - } - - @Override - public void close() - { - try { - log.info("disconnecting from irc server [%s]", host); - irc.disconnect(""); - } - finally { - closed = true; - } - } - }; - } -} - diff --git a/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcInputRowParser.java b/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcInputRowParser.java deleted file mode 100644 index b4a9fbbdd1d..00000000000 --- a/examples/src/main/java/org/apache/druid/examples/wikipedia/IrcInputRowParser.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.wikipedia; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.ImmutableList; -import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.data.input.impl.ParseSpec; -import org.apache.druid.java.util.common.Pair; -import org.joda.time.DateTime; - -import java.util.List; - -/** - *

Example Usage

- *

- *

Decoder definition: wikipedia-decoder.json

- *
{@code
- * 

- * { - * "type": "wikipedia", - * "namespaces": { - * "#en.wikipedia": { - * "": "main", - * "Category": "category", - * "Template talk": "template talk", - * "Help talk": "help talk", - * "Media": "media", - * "MediaWiki talk": "mediawiki talk", - * "File talk": "file talk", - * "MediaWiki": "mediawiki", - * "User": "user", - * "File": "file", - * "User talk": "user talk", - * "Template": "template", - * "Help": "help", - * "Special": "special", - * "Talk": "talk", - * "Category talk": "category talk" - * } - * }, - * "geoIpDatabase": "path/to/GeoLite2-City.mmdb" - * } - * }

- */ -@JsonTypeName("irc") -public class IrcInputRowParser implements InputRowParser> -{ - private final ParseSpec parseSpec; - private final IrcDecoder decoder; - - @JsonCreator - public IrcInputRowParser( - @JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("decoder") IrcDecoder decoder - ) - { - this.parseSpec = parseSpec; - this.decoder = decoder; - } - - @JsonProperty - public IrcDecoder getDecoder() - { - return decoder; - } - - @Override - public List parseBatch(Pair msg) - { - return ImmutableList.of(decoder.decodeMessage(msg.lhs, msg.rhs.getChannelName(), msg.rhs.getText())); - } - - @JsonProperty - @Override - public ParseSpec getParseSpec() - { - return parseSpec; - } - - @Override - public InputRowParser withParseSpec(ParseSpec parseSpec) - { - return new IrcInputRowParser(parseSpec, decoder); - } -} diff --git a/examples/src/main/java/org/apache/druid/examples/wikipedia/WikipediaIrcDecoder.java b/examples/src/main/java/org/apache/druid/examples/wikipedia/WikipediaIrcDecoder.java deleted file mode 100644 index edefc049383..00000000000 --- a/examples/src/main/java/org/apache/druid/examples/wikipedia/WikipediaIrcDecoder.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.wikipedia; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.maxmind.geoip2.DatabaseReader; -import com.maxmind.geoip2.exception.GeoIp2Exception; -import com.maxmind.geoip2.model.Omni; -import org.apache.commons.io.FileUtils; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.logger.Logger; -import org.joda.time.DateTime; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; - -class WikipediaIrcDecoder implements IrcDecoder -{ - static final Logger LOG = new Logger(WikipediaIrcDecoder.class); - - private static final Pattern PATTERN = Pattern.compile( - ".*\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)" + - "\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03.*" - ); - - private static final Pattern IP_PATTERN = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); - private static final Pattern SHORTNAME_PATTERN = Pattern.compile("#(\\w\\w)\\..*"); - private static final Pattern SINGLE_SPACE_PATTERN = Pattern.compile("\\s"); - - static final List dimensionList = Lists.newArrayList( - "page", - "language", - "user", - "unpatrolled", - "newPage", - "robot", - "anonymous", - "namespace", - "continent", - "country", - "region", - "city" - ); - - final DatabaseReader geoLookup; - final Map> namespaces; - final String geoIpDatabase; - - public WikipediaIrcDecoder(Map> namespaces) - { - this(namespaces, null); - } - - @JsonCreator - public WikipediaIrcDecoder( - @JsonProperty("namespaces") Map> namespaces, - @JsonProperty("geoIpDatabase") String geoIpDatabase - ) - { - if (namespaces == null) { - namespaces = new HashMap<>(); - } - this.namespaces = namespaces; - this.geoIpDatabase = geoIpDatabase; - - if (geoIpDatabase != null) { - this.geoLookup = openGeoIpDb(new File(geoIpDatabase)); - } else { - this.geoLookup = openDefaultGeoIpDb(); - } - } - - private DatabaseReader openDefaultGeoIpDb() - { - File geoDb = new File(System.getProperty("java.io.tmpdir"), - this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb"); - try { - return openDefaultGeoIpDb(geoDb); - } - catch (RuntimeException e) { - LOG.warn(e.getMessage() + " Attempting to re-download.", e); - if (geoDb.exists() && !geoDb.delete()) { - throw new RuntimeException("Could not delete geo db file [" + geoDb.getAbsolutePath() + "]."); - } - // local download may be corrupt, will retry once. - return openDefaultGeoIpDb(geoDb); - } - } - - private DatabaseReader openDefaultGeoIpDb(File geoDb) - { - downloadGeoLiteDbToFile(geoDb); - return openGeoIpDb(geoDb); - } - - private DatabaseReader openGeoIpDb(File geoDb) - { - try { - DatabaseReader reader = new DatabaseReader(geoDb); - LOG.info("Using geo ip database at [%s].", geoDb); - return reader; - } - catch (IOException e) { - throw new RuntimeException("Could not open geo db at [" + geoDb.getAbsolutePath() + "].", e); - } - } - - private void downloadGeoLiteDbToFile(File geoDb) - { - if (geoDb.exists()) { - return; - } - - try { - LOG.info("Downloading geo ip database to [%s]. This may take a few minutes.", geoDb.getAbsolutePath()); - - File tmpFile = File.createTempFile("druid", "geo"); - - FileUtils.copyInputStreamToFile( - new GZIPInputStream( - new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream() - ), - tmpFile - ); - - if (!tmpFile.renameTo(geoDb)) { - throw new RuntimeException("Unable to move geo file to [" + geoDb.getAbsolutePath() + "]!"); - } - } - catch (IOException e) { - throw new RuntimeException("Unable to download geo ip database.", e); - } - } - - @JsonProperty - public Map> getNamespaces() - { - return namespaces; - } - - @JsonProperty - public String getGeoIpDatabase() - { - return geoIpDatabase; - } - - @Override - public InputRow decodeMessage(final DateTime timestamp, String channel, String msg) - { - final Map dimensions = new HashMap<>(); - final Map metrics = new HashMap<>(); - - Matcher m = PATTERN.matcher(msg); - if (!m.matches()) { - throw new IllegalArgumentException("Invalid input format"); - } - - Matcher shortname = SHORTNAME_PATTERN.matcher(channel); - if (shortname.matches()) { - dimensions.put("language", shortname.group(1)); - } - - String page = m.group(1); - String pageUrl = SINGLE_SPACE_PATTERN.matcher(page).replaceAll("_"); - - dimensions.put("page", pageUrl); - - String user = m.group(4); - Matcher ipMatch = IP_PATTERN.matcher(user); - boolean anonymous = ipMatch.matches(); - if (anonymous) { - try { - final InetAddress ip = InetAddress.getByName(ipMatch.group()); - final Omni lookup = geoLookup.omni(ip); - - dimensions.put("continent", lookup.getContinent().getName()); - dimensions.put("country", lookup.getCountry().getName()); - dimensions.put("region", lookup.getMostSpecificSubdivision().getName()); - dimensions.put("city", lookup.getCity().getName()); - } - catch (UnknownHostException e) { - LOG.error(e, "invalid ip [%s]", ipMatch.group()); - } - catch (IOException e) { - LOG.error(e, "error looking up geo ip"); - } - catch (GeoIp2Exception e) { - LOG.error(e, "error looking up geo ip"); - } - } - dimensions.put("user", user); - - final String flags = m.group(2); - dimensions.put("unpatrolled", Boolean.toString(flags.contains("!"))); - dimensions.put("newPage", Boolean.toString(flags.contains("N"))); - dimensions.put("robot", Boolean.toString(flags.contains("B"))); - - dimensions.put("anonymous", Boolean.toString(anonymous)); - - String[] parts = page.split(":"); - if (parts.length > 1 && !parts[1].startsWith(" ")) { - Map channelNamespaces = namespaces.get(channel); - if (channelNamespaces != null && channelNamespaces.containsKey(parts[0])) { - dimensions.put("namespace", channelNamespaces.get(parts[0])); - } else { - dimensions.put("namespace", "wikipedia"); - } - } else { - dimensions.put("namespace", "article"); - } - - float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0; - metrics.put("delta", delta); - metrics.put("added", Math.max(delta, 0)); - metrics.put("deleted", Math.min(delta, 0)); - - return new InputRow() - { - @Override - public List getDimensions() - { - return dimensionList; - } - - @Override - public long getTimestampFromEpoch() - { - return timestamp.getMillis(); - } - - @Override - public DateTime getTimestamp() - { - return timestamp; - } - - @Override - public List getDimension(String dimension) - { - final String value = dimensions.get(dimension); - if (value != null) { - return ImmutableList.of(value); - } else { - return ImmutableList.of(); - } - } - - @Override - public Object getRaw(String dimension) - { - return dimensions.get(dimension); - } - - @Override - public Number getMetric(String metric) - { - return metrics.get(metric); - } - - @Override - public int compareTo(Row o) - { - return timestamp.compareTo(o.getTimestamp()); - } - - @Override - public String toString() - { - return "WikipediaRow{" + - "timestamp=" + timestamp + - ", dimensions=" + dimensions + - ", metrics=" + metrics + - '}'; - } - }; - } -} diff --git a/examples/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/examples/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule deleted file mode 100644 index 76e74d4860b..00000000000 --- a/examples/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.druid.examples.ExamplesDruidModule diff --git a/extensions-contrib/druid-rocketmq/pom.xml b/extensions-contrib/druid-rocketmq/pom.xml deleted file mode 100644 index 73091926af2..00000000000 --- a/extensions-contrib/druid-rocketmq/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - 4.0.0 - - druid - org.apache.druid - 0.16.0-incubating-SNAPSHOT - ../../pom.xml - - - org.apache.druid.extensions.contrib - druid-rocketmq - - - 3.2.6 - - - - - com.alibaba.rocketmq - rocketmq-client - ${rocketmq.version} - - - - io.netty - netty-all - - - - - org.apache.druid - druid-core - ${project.parent.version} - provided - - - diff --git a/extensions-contrib/druid-rocketmq/src/main/java/org/apache/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions-contrib/druid-rocketmq/src/main/java/org/apache/druid/firehose/rocketmq/RocketMQDruidModule.java deleted file mode 100644 index 0724a890ac4..00000000000 --- a/extensions-contrib/druid-rocketmq/src/main/java/org/apache/druid/firehose/rocketmq/RocketMQDruidModule.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.rocketmq; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import org.apache.druid.initialization.DruidModule; - -import java.util.List; - -public class RocketMQDruidModule implements DruidModule -{ - - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule("RocketMQFirehoseModule") - .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/extensions-contrib/druid-rocketmq/src/main/java/org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory.java deleted file mode 100644 index 377a98e365c..00000000000 --- a/extensions-contrib/druid-rocketmq/src/main/java/org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ /dev/null @@ -1,584 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.rocketmq; - -import com.alibaba.rocketmq.client.Validators; -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.client.consumer.MessageQueueListener; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.store.OffsetStore; -import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.ServiceThread; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Sets; -import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.common.parsers.ParseException; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CountDownLatch; - -public class RocketMQFirehoseFactory implements FirehoseFactory> -{ - - private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); - - /** - * Passed in configuration for consumer client. - * This provides an approach to override default values defined in {@link com.alibaba.rocketmq.common.MixAll}. - */ - @JsonProperty - private final Properties consumerProps; - - /** - * Consumer group. It's required. - */ - @JsonProperty(required = true) - private final String consumerGroup; - - /** - * Topics to consume. It's required. - */ - @JsonProperty(required = true) - private final List feed; - - /** - * Pull batch size. It's optional. - */ - @JsonProperty - private final String pullBatchSize; - - /** - * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. - */ - private final ConcurrentHashMap> messageQueueTreeSetMap = - new ConcurrentHashMap<>(); - - /** - * Store message consuming status. - */ - private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); - - /** - * Default pull batch size. - */ - private static final int DEFAULT_PULL_BATCH_SIZE = 32; - - @JsonCreator - public RocketMQFirehoseFactory( - @JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("consumerGroup") String consumerGroup, - @JsonProperty("feed") List feed, - @JsonProperty("pullBatchSize") String pullBatchSize - ) - { - this.consumerProps = consumerProps; - this.pullBatchSize = pullBatchSize; - for (Map.Entry configItem : this.consumerProps.entrySet()) { - System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); - } - this.consumerGroup = consumerGroup; - this.feed = feed; - } - - /** - * Check if there are locally pending messages to consume. - * - * @return true if there are some; false otherwise. - */ - private boolean hasMessagesPending() - { - - for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - return true; - } - } - - return false; - } - - @Override - public Firehose connect( - InputRowParser byteBufferInputRowParser, - File temporaryDirectory - ) throws IOException, ParseException - { - - Set newDimExclus = Sets.union( - byteBufferInputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), - Sets.newHashSet("feed") - ); - - final InputRowParser theParser = byteBufferInputRowParser.withParseSpec( - byteBufferInputRowParser.getParseSpec() - .withDimensionsSpec( - byteBufferInputRowParser.getParseSpec() - .getDimensionsSpec() - .withDimensionExclusions( - newDimExclus - ) - ) - ); - - /** - * Topic-Queue mapping. - */ - final ConcurrentHashMap> topicQueueMap; - - /** - * Default Pull-style client for RocketMQ. - */ - final DefaultMQPullConsumer defaultMQPullConsumer; - final DruidPullMessageService pullMessageService; - - messageQueueTreeSetMap.clear(); - windows.clear(); - - try { - defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup); - defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); - topicQueueMap = new ConcurrentHashMap<>(); - - pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); - for (String topic : feed) { - Validators.checkTopic(topic); - topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); - } - DruidMessageQueueListener druidMessageQueueListener = - new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); - defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); - defaultMQPullConsumer.start(); - pullMessageService.start(); - } - catch (MQClientException e) { - LOGGER.error(e, "Failed to start DefaultMQPullConsumer"); - throw new IOException("Failed to start RocketMQ client", e); - } - - return new Firehose() - { - private Iterator nextIterator = Collections.emptyIterator(); - - @Override - public boolean hasMore() - { - if (nextIterator.hasNext()) { - return true; - } - boolean hasMore = false; - DruidPullRequest earliestPullRequest = null; - - for (Map.Entry> entry : topicQueueMap.entrySet()) { - for (MessageQueue messageQueue : entry.getValue()) { - ConcurrentSkipListSet messages = messageQueueTreeSetMap.get(messageQueue); - if (messages != null && !messages.isEmpty()) { - hasMore = true; - } else { - try { - long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); - int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ? - DEFAULT_PULL_BATCH_SIZE : Integer.parseInt(pullBatchSize); - - DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset, - batchSize, !hasMessagesPending() - ); - - // notify pull message service to pull messages from brokers. - pullMessageService.putRequest(newPullRequest); - - // set the earliest pull in case we need to block. - if (null == earliestPullRequest) { - earliestPullRequest = newPullRequest; - } - } - catch (MQClientException e) { - LOGGER.error("Failed to fetch consume offset for queue: %s", entry.getKey()); - } - } - } - } - - // Block only when there is no locally pending messages. - if (!hasMore && null != earliestPullRequest) { - try { - earliestPullRequest.getCountDownLatch().await(); - hasMore = true; - } - catch (InterruptedException e) { - LOGGER.error(e, "CountDownLatch await got interrupted"); - } - } - return hasMore; - } - - @Nullable - @Override - public InputRow nextRow() - { - if (nextIterator.hasNext()) { - return nextIterator.next(); - } - - for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - MessageExt message = entry.getValue().pollFirst(); - nextIterator = theParser.parseBatch(ByteBuffer.wrap(message.getBody())).iterator(); - - windows - .computeIfAbsent(entry.getKey(), k -> new ConcurrentSkipListSet<>()) - .add(message.getQueueOffset()); - return nextIterator.next(); - } - } - - // should never happen. - throw new RuntimeException("Unexpected Fatal Error! There should have been one row available."); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore(); - Set updated = new HashSet<>(); - // calculate offsets according to consuming windows. - for (Map.Entry> entry : windows.entrySet()) { - while (!entry.getValue().isEmpty()) { - - long offset = offsetStore.readOffset(entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE); - if (offset + 1 > entry.getValue().first()) { - entry.getValue().pollFirst(); - } else if (offset + 1 == entry.getValue().first()) { - entry.getValue().pollFirst(); - offsetStore.updateOffset(entry.getKey(), offset + 1, true); - updated.add(entry.getKey()); - } else { - break; - } - - } - } - offsetStore.persistAll(updated); - } - }; - } - - @Override - public void close() - { - defaultMQPullConsumer.shutdown(); - pullMessageService.shutdown(false); - } - }; - } - - - /** - * Pull request. - */ - static final class DruidPullRequest - { - private final MessageQueue messageQueue; - private final String tag; - private final long nextBeginOffset; - private final int pullBatchSize; - private final boolean longPull; - private final CountDownLatch countDownLatch; - - public DruidPullRequest( - final MessageQueue messageQueue, - final String tag, - final long nextBeginOffset, - final int pullBatchSize, - final boolean useLongPull - ) - { - this.messageQueue = messageQueue; - this.tag = (null == tag ? "*" : tag); - this.nextBeginOffset = nextBeginOffset; - this.pullBatchSize = pullBatchSize; - this.longPull = useLongPull; - countDownLatch = new CountDownLatch(1); - } - - public MessageQueue getMessageQueue() - { - return messageQueue; - } - - public long getNextBeginOffset() - { - return nextBeginOffset; - } - - public String getTag() - { - return tag; - } - - public int getPullBatchSize() - { - return pullBatchSize; - } - - public boolean isLongPull() - { - return longPull; - } - - public CountDownLatch getCountDownLatch() - { - return countDownLatch; - } - } - - - /** - * Pull message service for druid. - *

- * Note: this is a single thread service. - */ - final class DruidPullMessageService extends ServiceThread - { - - private volatile List requestsWrite = new ArrayList<>(); - private volatile List requestsRead = new ArrayList<>(); - - private final DefaultMQPullConsumer defaultMQPullConsumer; - - public DruidPullMessageService(final DefaultMQPullConsumer defaultMQPullConsumer) - { - this.defaultMQPullConsumer = defaultMQPullConsumer; - } - - public void putRequest(final DruidPullRequest request) - { - synchronized (this) { - this.requestsWrite.add(request); - if (!hasNotified) { - hasNotified = true; - // No need to use notifyAll here since extended class com.alibaba.rocketmq.common.ServiceThread handles it - notify(); - } - } - } - - private void swapRequests() - { - List tmp = requestsWrite; - requestsWrite = requestsRead; - requestsRead = tmp; - } - - @Override - public String getServiceName() - { - return getClass().getSimpleName(); - } - - /** - * Core message pulling logic code goes here. - */ - private void doPull() - { - for (DruidPullRequest pullRequest : requestsRead) { - PullResult pullResult; - try { - if (!pullRequest.isLongPull()) { - pullResult = defaultMQPullConsumer.pull( - pullRequest.getMessageQueue(), - pullRequest.getTag(), - pullRequest.getNextBeginOffset(), - pullRequest.getPullBatchSize() - ); - } else { - pullResult = defaultMQPullConsumer.pullBlockIfNotFound( - pullRequest.getMessageQueue(), - pullRequest.getTag(), - pullRequest.getNextBeginOffset(), - pullRequest.getPullBatchSize() - ); - } - - switch (pullResult.getPullStatus()) { - case FOUND: - // Handle pull result. - messageQueueTreeSetMap - .computeIfAbsent(pullRequest.getMessageQueue(), k -> new ConcurrentSkipListSet<>(MESSAGE_COMPARATOR)) - .addAll(pullResult.getMsgFoundList()); - break; - - case NO_NEW_MSG: - case NO_MATCHED_MSG: - break; - - case OFFSET_ILLEGAL: - LOGGER.error( - "Bad Pull Request: Offset is illegal. Offset used: %d", - pullRequest.getNextBeginOffset() - ); - break; - - default: - break; - } - } - catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { - LOGGER.error(e, "Failed to pull message from broker."); - } - finally { - pullRequest.getCountDownLatch().countDown(); - } - - } - requestsRead.clear(); - } - - /** - * Thread looping entry. - */ - @Override - public void run() - { - LOGGER.info(getServiceName() + " starts."); - while (!isStoped()) { - waitForRunning(0); - doPull(); - } - - // in case this service is shutdown gracefully without interruption. - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - LOGGER.error(e, ""); - } - - synchronized (this) { - swapRequests(); - } - - doPull(); - LOGGER.info(getServiceName() + " terminated."); - } - - @Override - protected void onWaitEnd() - { - swapRequests(); - } - } - - - /** - * Compare messages pulled from same message queue according to queue offset. - */ - private static final Comparator MESSAGE_COMPARATOR = Comparator.comparingLong(MessageExt::getQueueOffset); - - - /** - * Handle message queues re-balance operations. - */ - final class DruidMessageQueueListener implements MessageQueueListener - { - - private final Set topics; - - private final ConcurrentHashMap> topicQueueMap; - - private final DefaultMQPullConsumer defaultMQPullConsumer; - - public DruidMessageQueueListener( - final Set topics, - final ConcurrentHashMap> topicQueueMap, - final DefaultMQPullConsumer defaultMQPullConsumer - ) - { - this.topics = topics; - this.topicQueueMap = topicQueueMap; - this.defaultMQPullConsumer = defaultMQPullConsumer; - } - - @Override - public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) - { - if (topics.contains(topic)) { - topicQueueMap.put(topic, mqDivided); - - // Remove message queues that are re-assigned to other clients. - Iterator>> it = - messageQueueTreeSetMap.entrySet().iterator(); - while (it.hasNext()) { - if (!mqDivided.contains(it.next().getKey())) { - it.remove(); - } - } - - StringBuilder stringBuilder = new StringBuilder(); - for (MessageQueue messageQueue : mqDivided) { - stringBuilder.append(messageQueue.getBrokerName()) - .append("#") - .append(messageQueue.getQueueId()) - .append(", "); - } - - if (LOGGER.isDebugEnabled() && stringBuilder.length() > 2) { - LOGGER.debug(StringUtils.format( - "%s@%s is consuming the following message queues: %s", - defaultMQPullConsumer.getClientIP(), - defaultMQPullConsumer.getInstanceName(), - stringBuilder.substring(0, stringBuilder.length() - 2) /*Remove the trailing comma*/ - )); - } - } - - } - } -} diff --git a/extensions-contrib/kafka-eight-simpleConsumer/pom.xml b/extensions-contrib/kafka-eight-simpleConsumer/pom.xml deleted file mode 100644 index 7106f507f2f..00000000000 --- a/extensions-contrib/kafka-eight-simpleConsumer/pom.xml +++ /dev/null @@ -1,89 +0,0 @@ - - - - - 4.0.0 - org.apache.druid.extensions.contrib - druid-kafka-eight-simple-consumer - druid-kafka-eight-simple-consumer - druid-kafka-eight-simple-consumer - - - org.apache.druid - druid - 0.16.0-incubating-SNAPSHOT - ../../pom.xml - - - - - org.apache.druid - druid-core - ${project.parent.version} - provided - - - org.apache.kafka - kafka_2.10 - 0.8.2.1 - - - log4j - log4j - - - org.apache.zookeeper - zookeeper - - - org.slf4j - slf4j-api - - - net.jpountz.lz4 - lz4 - - - - - - junit - junit - test - - - - - - - maven-jar-plugin - - - - true - true - - - - - - - diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java deleted file mode 100644 index 8fad815177d..00000000000 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.kafka; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import org.apache.druid.initialization.DruidModule; - -import java.util.List; - -@Deprecated -public class KafkaEightSimpleConsumerDruidModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule("KafkaEightSimpleConsumerFirehoseModule").registerSubtypes( - new NamedType(KafkaEightSimpleConsumerFirehoseFactory.class, "kafka-0.8-v2") - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java deleted file mode 100644 index ca34e55d6a7..00000000000 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.kafka; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.io.Closeables; -import org.apache.druid.data.input.ByteBufferInputRowParser; -import org.apache.druid.data.input.Committer; -import org.apache.druid.data.input.FirehoseFactoryV2; -import org.apache.druid.data.input.FirehoseV2; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.emitter.EmittingLogger; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -@Deprecated -public class KafkaEightSimpleConsumerFirehoseFactory implements - FirehoseFactoryV2 -{ - private static final EmittingLogger log = new EmittingLogger( - KafkaEightSimpleConsumerFirehoseFactory.class - ); - - @JsonProperty - private final List brokerList; - - @JsonProperty - private final List partitionIdList; - - @JsonProperty - private final String clientId; - - @JsonProperty - private final String feed; - - @JsonProperty - private final int queueBufferLength; - - @JsonProperty - private final boolean earliest; - - private final List consumerWorkers = new CopyOnWriteArrayList<>(); - private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000; - private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(10); - - @JsonCreator - public KafkaEightSimpleConsumerFirehoseFactory( - @JsonProperty("brokerList") List brokerList, - @JsonProperty("partitionIdList") List partitionIdList, - @JsonProperty("clientId") String clientId, - @JsonProperty("feed") String feed, - @JsonProperty("queueBufferLength") Integer queueBufferLength, - @JsonProperty("resetOffsetToEarliest") Boolean resetOffsetToEarliest - ) - { - this.brokerList = brokerList; - Preconditions.checkArgument( - brokerList != null && brokerList.size() > 0, - "brokerList is null/empty" - ); - - this.partitionIdList = partitionIdList; - Preconditions.checkArgument( - partitionIdList != null && partitionIdList.size() > 0, - "partitionIdList is null/empty" - ); - - this.clientId = clientId; - Preconditions.checkArgument( - clientId != null && !clientId.isEmpty(), - "clientId is null/empty" - ); - - this.feed = feed; - Preconditions.checkArgument( - feed != null && !feed.isEmpty(), - "feed is null/empty" - ); - - this.queueBufferLength = queueBufferLength == null ? DEFAULT_QUEUE_BUFFER_LENGTH : queueBufferLength; - Preconditions.checkArgument(this.queueBufferLength > 0, "queueBufferLength must be positive number"); - log.info("queueBufferLength loaded as[%s]", this.queueBufferLength); - - this.earliest = resetOffsetToEarliest == null ? true : resetOffsetToEarliest.booleanValue(); - log.info( - "if old offsets are not known, data from partition will be read from [%s] available offset.", - this.earliest ? "earliest" : "latest" - ); - } - - private Map loadOffsetFromPreviousMetaData(Object lastCommit) - { - Map offsetMap = new HashMap<>(); - if (lastCommit == null) { - return offsetMap; - } - if (lastCommit instanceof Map) { - Map lastCommitMap = (Map) lastCommit; - for (Map.Entry entry : lastCommitMap.entrySet()) { - try { - int partitionId = Integer.parseInt(entry.getKey().toString()); - long offset = Long.parseLong(entry.getValue().toString()); - log.debug("Recover last commit information partitionId [%s], offset [%s]", partitionId, offset); - offsetMap.put(partitionId, offset); - } - catch (NumberFormatException e) { - log.error(e, "Fail to load offset from previous meta data [%s]", entry); - } - } - log.info("Loaded offset map[%s]", offsetMap); - } else { - log.makeAlert("Unable to cast lastCommit to Map for feed [%s]", feed).emit(); - } - return offsetMap; - } - - @Override - public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object lastCommit) - { - final Map lastOffsets = loadOffsetFromPreviousMetaData(lastCommit); - - for (Integer partition : partitionIdList) { - final KafkaSimpleConsumer kafkaSimpleConsumer = - new KafkaSimpleConsumer(feed, partition, clientId, brokerList, earliest); - Long startOffset = lastOffsets.get(partition); - PartitionConsumerWorker worker = new PartitionConsumerWorker( - feed, - kafkaSimpleConsumer, - partition, - startOffset == null ? -1 : startOffset - ); - consumerWorkers.add(worker); - } - - final LinkedBlockingQueue messageQueue = new LinkedBlockingQueue( - queueBufferLength - ); - log.info("Kicking off all consumers"); - for (PartitionConsumerWorker worker : consumerWorkers) { - worker.go(messageQueue); - } - log.info("All consumer started"); - - return new FirehoseV2() - { - private Map lastOffsetPartitions; - private volatile boolean stopped; - private volatile BytesMessageWithOffset msg = null; - private volatile InputRow row = null; - private volatile Iterator nextIterator = Collections.emptyIterator(); - - { - lastOffsetPartitions = new HashMap<>(); - lastOffsetPartitions.putAll(lastOffsets); - } - - @Override - public void start() - { - } - - @Override - public boolean advance() - { - if (stopped) { - return false; - } - - nextMessage(); - return true; - } - - private void nextMessage() - { - try { - row = null; - while (row == null) { - if (!nextIterator.hasNext()) { - if (msg != null) { - lastOffsetPartitions.put(msg.getPartition(), msg.offset()); - } - msg = messageQueue.take(); - final byte[] message = msg.message(); - nextIterator = message == null - ? Collections.emptyIterator() - : firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator(); - continue; - } - row = nextIterator.next(); - } - } - catch (InterruptedException e) { - //Let the caller decide whether to stop or continue when thread is interrupted. - log.warn(e, "Thread Interrupted while taking from queue, propagating the interrupt"); - Thread.currentThread().interrupt(); - } - } - - @Override - public InputRow currRow() - { - if (stopped) { - return null; - } - // currRow will be called before the first advance - if (row == null) { - try { - nextMessage(); - } - catch (ParseException e) { - return null; - } - } - return row; - } - - @Override - public Committer makeCommitter() - { - final Map offsets = new HashMap<>(lastOffsetPartitions); - - return new Committer() - { - @Override - public Object getMetadata() - { - return offsets; - } - - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - log.info("Stopping kafka 0.8 simple firehose"); - stopped = true; - for (PartitionConsumerWorker t : consumerWorkers) { - Closeables.close(t, true); - } - } - }; - } - - private static class PartitionConsumerWorker implements Closeable - { - private final String topic; - private final KafkaSimpleConsumer consumer; - private final int partitionId; - private final long startOffset; - - private final AtomicBoolean stopped = new AtomicBoolean(false); - private volatile Thread thread = null; - - PartitionConsumerWorker(String topic, KafkaSimpleConsumer consumer, int partitionId, long startOffset) - { - this.topic = topic; - this.consumer = consumer; - this.partitionId = partitionId; - this.startOffset = startOffset; - } - - public void go(final LinkedBlockingQueue messageQueue) - { - thread = new Thread() - { - @Override - public void run() - { - long offset = startOffset; - log.info("Start running parition[%s], offset[%s]", partitionId, offset); - try { - while (!stopped.get()) { - try { - Iterable msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT_MILLIS); - int count = 0; - for (BytesMessageWithOffset msgWithOffset : msgs) { - offset = msgWithOffset.offset(); - messageQueue.put(msgWithOffset); - count++; - } - log.debug("fetch [%s] msgs for partition [%s] in one time ", count, partitionId); - } - catch (InterruptedException e) { - log.info("Interrupted when fetching data, shutting down."); - return; - } - catch (Exception e) { - log.error(e, "Exception happened in fetching data, but will continue consuming"); - } - } - } - finally { - consumer.stop(); - } - } - }; - thread.setDaemon(true); - thread.setName(StringUtils.format("kafka-%s-%s", topic, partitionId)); - thread.start(); - } - - @Override - public synchronized void close() - { - if (stopped.compareAndSet(false, true)) { - thread.interrupt(); - thread = null; - } - } - } -} diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java deleted file mode 100644 index ce3028ff113..00000000000 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.kafka; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.net.HostAndPort; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.cluster.Broker; -import kafka.common.ErrorMapping; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.TopicMetadataResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.FunctionalIterable; -import org.apache.druid.java.util.common.logger.Logger; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example} - *

- * This class is not thread safe, the caller must ensure all the methods be - * called from single thread - */ -@Deprecated -public class KafkaSimpleConsumer -{ - - private static final Logger log = new Logger(KafkaSimpleConsumer.class); - - private final List allBrokers; - private final String topic; - private final int partitionId; - private final String clientId; - private final String leaderLookupClientId; - private final boolean earliest; - - private volatile Broker leaderBroker; - private List replicaBrokers; - private SimpleConsumer consumer = null; - - private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); - private static final int BUFFER_SIZE = 65536; - private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1); - private static final int FETCH_SIZE = 100_000_000; - - public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List brokers, boolean earliest) - { - List brokerList = new ArrayList<>(); - for (String broker : brokers) { - HostAndPort brokerHostAndPort = HostAndPort.fromString(broker); - Preconditions.checkArgument( - brokerHostAndPort.getHostText() != null && - !brokerHostAndPort.getHostText().isEmpty() && - brokerHostAndPort.hasPort(), - "kafka broker [%s] is not valid, must be :", - broker - ); - brokerList.add(brokerHostAndPort); - } - - this.allBrokers = Collections.unmodifiableList(brokerList); - this.topic = topic; - this.partitionId = partitionId; - this.clientId = StringUtils.format("%s_%d_%s", topic, partitionId, clientId); - this.leaderLookupClientId = clientId + "leaderLookup"; - this.replicaBrokers = new ArrayList<>(); - this.replicaBrokers.addAll(this.allBrokers); - this.earliest = earliest; - log.info( - "KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup", - this.clientId, - this.leaderLookupClientId - ); - } - - private void ensureConsumer(Broker leader) throws InterruptedException - { - if (consumer == null) { - while (leaderBroker == null) { - leaderBroker = findNewLeader(leader); - } - log.info( - "making SimpleConsumer[%s][%s], leader broker[%s:%s]", - topic, partitionId, leaderBroker.host(), leaderBroker.port() - ); - - consumer = new SimpleConsumer( - leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, clientId - ); - } - } - - public static class BytesMessageWithOffset - { - final byte[] msg; - final long offset; - final int partition; - - public BytesMessageWithOffset(byte[] msg, long offset, int partition) - { - this.msg = msg; - this.offset = offset; - this.partition = partition; - } - - public int getPartition() - { - return partition; - } - - public byte[] message() - { - return msg; - } - - public long offset() - { - return offset; - } - } - - private Iterable filterAndDecode(Iterable kafkaMessages, final long offset) - { - return FunctionalIterable - .create(kafkaMessages) - .filter( - new Predicate() - { - @Override - public boolean apply(MessageAndOffset msgAndOffset) - { - return msgAndOffset.offset() >= offset; - } - } - ) - .transform( - new Function() - { - - @Override - public BytesMessageWithOffset apply(MessageAndOffset msgAndOffset) - { - ByteBuffer bb = msgAndOffset.message().payload(); - byte[] payload = new byte[bb.remaining()]; - bb.get(payload); - // add nextOffset here, thus next fetch will use nextOffset instead of current offset - return new BytesMessageWithOffset(payload, msgAndOffset.nextOffset(), partitionId); - } - } - ); - } - - private long getOffset(boolean earliest) throws InterruptedException - { - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); - Map requestInfo = new HashMap(); - requestInfo.put( - topicAndPartition, - new PartitionOffsetRequestInfo( - earliest ? kafka.api.OffsetRequest.EarliestTime() : kafka.api.OffsetRequest.LatestTime(), 1 - ) - ); - OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId); - OffsetResponse response; - try { - response = consumer.getOffsetsBefore(request); - } - catch (Exception e) { - ensureNotInterrupted(e); - log.error(e, "caught exception in getOffsetsBefore [%s] - [%s]", topic, partitionId); - return -1; - } - if (response.hasError()) { - log.error( - "error fetching data Offset from the Broker [%s]. reason: [%s]", leaderBroker.host(), - response.errorCode(topic, partitionId) - ); - return -1; - } - long[] offsets = response.offsets(topic, partitionId); - return earliest ? offsets[0] : offsets[offsets.length - 1]; - } - - public Iterable fetch(long offset, int timeoutMs) throws InterruptedException - { - FetchResponse response; - Broker previousLeader = leaderBroker; - while (true) { - ensureConsumer(previousLeader); - - FetchRequest request = new FetchRequestBuilder() - .clientId(clientId) - .addFetch(topic, partitionId, offset, FETCH_SIZE) - .maxWait(timeoutMs) - .minBytes(1) - .build(); - - log.debug("fetch offset %s", offset); - - try { - response = consumer.fetch(request); - } - catch (Exception e) { - ensureNotInterrupted(e); - log.warn(e, "caught exception in fetch %s - %d", topic, partitionId); - response = null; - } - - if (response == null || response.hasError()) { - short errorCode = response != null ? response.errorCode(topic, partitionId) : ErrorMapping.UnknownCode(); - log.warn("fetch %s - %s with offset %s encounters error: [%s]", topic, partitionId, offset, errorCode); - - boolean needNewLeader = false; - if (errorCode == ErrorMapping.RequestTimedOutCode()) { - log.info("kafka request timed out, response[%s]", response); - } else if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { - long newOffset = getOffset(earliest); - log.info("got [%s] offset[%s] for [%s][%s]", earliest ? "earliest" : "latest", newOffset, topic, partitionId); - if (newOffset < 0) { - needNewLeader = true; - } else { - offset = newOffset; - continue; - } - } else { - needNewLeader = true; - } - - if (needNewLeader) { - stopConsumer(); - previousLeader = leaderBroker; - leaderBroker = null; - continue; - } - } else { - break; - } - } - - return filterAndDecode(response.messageSet(topic, partitionId), offset); - } - - private void stopConsumer() - { - if (consumer != null) { - try { - consumer.close(); - log.info("stop consumer[%s][%s], leaderBroker[%s]", topic, partitionId, leaderBroker); - } - catch (Exception e) { - log.warn(e, "stop consumer[%s][%s] failed", topic, partitionId); - } - finally { - consumer = null; - } - } - } - - public void stop() - { - stopConsumer(); - log.info("KafkaSimpleConsumer[%s][%s] stopped", topic, partitionId); - } - - private PartitionMetadata findLeader() throws InterruptedException - { - for (HostAndPort broker : replicaBrokers) { - SimpleConsumer consumer = null; - try { - log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString()); - consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId); - TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic))); - - List metaData = resp.topicsMetadata(); - for (TopicMetadata item : metaData) { - if (topic.equals(item.topic())) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == partitionId) { - return part; - } - } - } - } - } - catch (Exception e) { - ensureNotInterrupted(e); - log.warn( - e, - "error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]", - broker, - topic, - partitionId - ); - } - finally { - if (consumer != null) { - consumer.close(); - } - } - } - - return null; - } - - private Broker findNewLeader(Broker oldLeader) throws InterruptedException - { - long retryCnt = 0; - while (true) { - PartitionMetadata metadata = findLeader(); - if (metadata != null) { - replicaBrokers.clear(); - for (Broker replica : metadata.replicas()) { - replicaBrokers.add( - HostAndPort.fromParts(replica.host(), replica.port()) - ); - } - - log.debug("Got new Kafka leader metadata : [%s], previous leader : [%s]", metadata, oldLeader); - Broker newLeader = metadata.leader(); - if (newLeader != null) { - // We check the retryCnt here as well to make sure that we have slept a little bit - // if we don't notice a change in leadership - // just in case if Zookeeper doesn't get updated fast enough - if (oldLeader == null || isValidNewLeader(newLeader) || retryCnt != 0) { - return newLeader; - } - } - } - - Thread.sleep(RETRY_INTERVAL_MILLIS); - retryCnt++; - // if could not find the leader for current replicaBrokers, let's try to - // find one via allBrokers - if (retryCnt >= 3 && (retryCnt - 3) % 5 == 0) { - log.warn("cannot find leader for [%s] - [%s] after [%s] retries", topic, partitionId, retryCnt); - replicaBrokers.clear(); - replicaBrokers.addAll(allBrokers); - } - } - } - - private boolean isValidNewLeader(Broker broker) - { - // broker is considered valid new leader if it is not the same as old leaderBroker - return !(leaderBroker.host().equalsIgnoreCase(broker.host()) && leaderBroker.port() == broker.port()); - } - - private void ensureNotInterrupted(Exception e) throws InterruptedException - { - if (Thread.interrupted()) { - log.error(e, "Interrupted during fetching for %s - %s", topic, partitionId); - throw new InterruptedException(); - } - } -} diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule deleted file mode 100644 index eada087524f..00000000000 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.druid.firehose.kafka.KafkaEightSimpleConsumerDruidModule diff --git a/extensions-contrib/rabbitmq/pom.xml b/extensions-contrib/rabbitmq/pom.xml deleted file mode 100644 index 65508a1a55e..00000000000 --- a/extensions-contrib/rabbitmq/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ - - - - - 4.0.0 - - org.apache.druid.extensions.contrib - druid-rabbitmq - druid-rabbitmq - druid-rabbitmq - - - org.apache.druid - druid - 0.16.0-incubating-SNAPSHOT - ../../pom.xml - - - - - org.apache.druid - druid-core - ${project.parent.version} - provided - - - com.rabbitmq - amqp-client - 3.2.1 - - - net.jodah - lyra - 0.3.1 - - - org.slf4j - slf4j-api - - - com.rabbitmq - amqp-client - - - - - - - junit - junit - test - - - commons-cli - commons-cli - test - - - org.apache.druid - druid-processing - ${project.parent.version} - test - - - - diff --git a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java b/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java deleted file mode 100644 index 14cf24524a0..00000000000 --- a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.rabbitmq; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Maps; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.LongString; - -import java.net.URI; -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.Map; - -/** - * A Jacksonified version of the RabbitMQ ConnectionFactory for better integration - * into the realtime.spec configuration file format. - */ -public class JacksonifiedConnectionFactory extends ConnectionFactory -{ - public static JacksonifiedConnectionFactory makeDefaultConnectionFactory() throws Exception - { - return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null); - } - - private static Map getSerializableClientProperties(final Map clientProperties) - { - return Maps.transformEntries( - clientProperties, - new Maps.EntryTransformer() - { - @Override - public Object transformEntry(String key, Object value) - { - if (value instanceof LongString) { - return value.toString(); - } - return value; - } - } - ); - } - - private final String host; - private final int port; - private final String username; - private final String password; - private final String virtualHost; - private final String uri; - private final int requestedChannelMax; - private final int requestedFrameMax; - private final int requestedHeartbeat; - private final int connectionTimeout; - private final Map clientProperties; - - @JsonCreator - public JacksonifiedConnectionFactory( - @JsonProperty("host") String host, - @JsonProperty("port") int port, - @JsonProperty("username") String username, - @JsonProperty("password") String password, - @JsonProperty("virtualHost") String virtualHost, - @JsonProperty("uri") String uri, - @JsonProperty("requestedChannelMax") int requestedChannelMax, - @JsonProperty("requestedFrameMax") int requestedFrameMax, - @JsonProperty("requestedHeartbeat") int requestedHeartbeat, - @JsonProperty("connectionTimeout") int connectionTimeout, - @JsonProperty("clientProperties") Map clientProperties - ) throws Exception - { - super(); - - this.host = host == null ? super.getHost() : host; - this.port = port == 0 ? super.getPort() : port; - this.username = username == null ? super.getUsername() : username; - this.password = password == null ? super.getPassword() : password; - this.virtualHost = virtualHost == null ? super.getVirtualHost() : virtualHost; - this.uri = uri; - this.requestedChannelMax = requestedChannelMax == 0 ? super.getRequestedChannelMax() : requestedChannelMax; - this.requestedFrameMax = requestedFrameMax == 0 ? super.getRequestedFrameMax() : requestedFrameMax; - this.requestedHeartbeat = requestedHeartbeat == 0 ? super.getRequestedHeartbeat() : requestedHeartbeat; - this.connectionTimeout = connectionTimeout == 0 ? super.getConnectionTimeout() : connectionTimeout; - this.clientProperties = clientProperties == null ? super.getClientProperties() : clientProperties; - - super.setHost(this.host); - super.setPort(this.port); - super.setUsername(this.username); - super.setPassword(this.password); - super.setVirtualHost(this.virtualHost); - if (this.uri != null) { - super.setUri(this.uri); - } - super.setRequestedChannelMax(this.requestedChannelMax); - super.setRequestedFrameMax(this.requestedFrameMax); - super.setRequestedHeartbeat(this.requestedHeartbeat); - super.setConnectionTimeout(this.connectionTimeout); - super.setClientProperties(this.clientProperties); - } - - @Override - @JsonProperty - public String getHost() - { - return host; - } - - @Override - @JsonProperty - public int getPort() - { - return port; - } - - - @Override - @JsonProperty - public String getUsername() - { - return username; - } - - @Override - @JsonProperty - public String getPassword() - { - return password; - } - - @Override - @JsonProperty - public String getVirtualHost() - { - return virtualHost; - } - - @JsonProperty - public String getUri() - { - return uri; - } - - // we are only overriding this to help Jackson not be confused about the two setURI methods - @JsonIgnore - @Override - public void setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException - { - super.setUri(uri); - } - - @Override - public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException - { - super.setUri(uriString); - } - - @Override - @JsonProperty - public int getRequestedChannelMax() - { - return requestedChannelMax; - } - - @Override - @JsonProperty - public int getRequestedFrameMax() - { - return requestedFrameMax; - } - - @Override - @JsonProperty - public int getRequestedHeartbeat() - { - return requestedHeartbeat; - } - - @Override - @JsonProperty - public int getConnectionTimeout() - { - return connectionTimeout; - } - - @JsonProperty("clientProperties") - public Map getSerializableClientProperties() - { - return getSerializableClientProperties(clientProperties); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - JacksonifiedConnectionFactory that = (JacksonifiedConnectionFactory) o; - - if (connectionTimeout != that.connectionTimeout) { - return false; - } - if (port != that.port) { - return false; - } - if (requestedChannelMax != that.requestedChannelMax) { - return false; - } - if (requestedFrameMax != that.requestedFrameMax) { - return false; - } - if (requestedHeartbeat != that.requestedHeartbeat) { - return false; - } - if (clientProperties != null - ? !Maps.difference( - getSerializableClientProperties(clientProperties), - getSerializableClientProperties(that.clientProperties) - ).areEqual() - : that.clientProperties != null) { - return false; - } - if (host != null ? !host.equals(that.host) : that.host != null) { - return false; - } - if (password != null ? !password.equals(that.password) : that.password != null) { - return false; - } - if (uri != null ? !uri.equals(that.uri) : that.uri != null) { - return false; - } - if (username != null ? !username.equals(that.username) : that.username != null) { - return false; - } - if (virtualHost != null ? !virtualHost.equals(that.virtualHost) : that.virtualHost != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = host != null ? host.hashCode() : 0; - result = 31 * result + port; - result = 31 * result + (username != null ? username.hashCode() : 0); - result = 31 * result + (password != null ? password.hashCode() : 0); - result = 31 * result + (virtualHost != null ? virtualHost.hashCode() : 0); - result = 31 * result + (uri != null ? uri.hashCode() : 0); - result = 31 * result + requestedChannelMax; - result = 31 * result + requestedFrameMax; - result = 31 * result + requestedHeartbeat; - result = 31 * result + connectionTimeout; - result = 31 * result + (clientProperties != null ? clientProperties.hashCode() : 0); - return result; - } -} diff --git a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQDruidModule.java b/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQDruidModule.java deleted file mode 100644 index 0038c537387..00000000000 --- a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQDruidModule.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.rabbitmq; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import org.apache.druid.initialization.DruidModule; - -import java.util.List; - -/** - */ -public class RabbitMQDruidModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule("RabbitMQFirehoseModule") - .registerSubtypes( - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq") - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java b/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java deleted file mode 100644 index 08dee24615e..00000000000 --- a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.rabbitmq; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * A configuration object for a RabbitMQ connection. - */ -public class RabbitMQFirehoseConfig -{ - // Lyra (auto reconnect) properties - private static final int defaultMaxRetries = 100; - private static final int defaultRetryIntervalSeconds = 2; - private static final long defaultMaxDurationSeconds = 5 * 60; - - public static RabbitMQFirehoseConfig makeDefaultConfig() - { - return new RabbitMQFirehoseConfig(null, null, null, false, false, false, 0, 0, 0); - } - - private final String queue; - private final String exchange; - private final String routingKey; - private final boolean durable; - private final boolean exclusive; - private final boolean autoDelete; - private final int maxRetries; - private final int retryIntervalSeconds; - private final long maxDurationSeconds; - - @JsonCreator - public RabbitMQFirehoseConfig( - @JsonProperty("queue") String queue, - @JsonProperty("exchange") String exchange, - @JsonProperty("routingKey") String routingKey, - @JsonProperty("durable") boolean durable, - @JsonProperty("exclusive") boolean exclusive, - @JsonProperty("autoDelete") boolean autoDelete, - @JsonProperty("maxRetries") int maxRetries, - @JsonProperty("retryIntervalSeconds") int retryIntervalSeconds, - @JsonProperty("maxDurationSeconds") long maxDurationSeconds - ) - { - this.queue = queue; - this.exchange = exchange; - this.routingKey = routingKey; - this.durable = durable; - this.exclusive = exclusive; - this.autoDelete = autoDelete; - - this.maxRetries = maxRetries == 0 ? defaultMaxRetries : maxRetries; - this.retryIntervalSeconds = retryIntervalSeconds == 0 ? defaultRetryIntervalSeconds : retryIntervalSeconds; - this.maxDurationSeconds = maxDurationSeconds == 0 ? defaultMaxDurationSeconds : maxDurationSeconds; - } - - @JsonProperty - public String getQueue() - { - return queue; - } - - @JsonProperty - public String getExchange() - { - return exchange; - } - - @JsonProperty - public String getRoutingKey() - { - return routingKey; - } - - @JsonProperty - public boolean isDurable() - { - return durable; - } - - @JsonProperty - public boolean isExclusive() - { - return exclusive; - } - - @JsonProperty - public boolean isAutoDelete() - { - return autoDelete; - } - - @JsonProperty - public int getMaxRetries() - { - return maxRetries; - } - - @JsonProperty - public int getRetryIntervalSeconds() - { - return retryIntervalSeconds; - } - - @JsonProperty - public long getMaxDurationSeconds() - { - return maxDurationSeconds; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - RabbitMQFirehoseConfig that = (RabbitMQFirehoseConfig) o; - - if (autoDelete != that.autoDelete) { - return false; - } - if (durable != that.durable) { - return false; - } - if (exclusive != that.exclusive) { - return false; - } - if (maxDurationSeconds != that.maxDurationSeconds) { - return false; - } - if (maxRetries != that.maxRetries) { - return false; - } - if (retryIntervalSeconds != that.retryIntervalSeconds) { - return false; - } - if (exchange != null ? !exchange.equals(that.exchange) : that.exchange != null) { - return false; - } - if (queue != null ? !queue.equals(that.queue) : that.queue != null) { - return false; - } - if (routingKey != null ? !routingKey.equals(that.routingKey) : that.routingKey != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = queue != null ? queue.hashCode() : 0; - result = 31 * result + (exchange != null ? exchange.hashCode() : 0); - result = 31 * result + (routingKey != null ? routingKey.hashCode() : 0); - result = 31 * result + (durable ? 1 : 0); - result = 31 * result + (exclusive ? 1 : 0); - result = 31 * result + (autoDelete ? 1 : 0); - result = 31 * result + maxRetries; - result = 31 * result + retryIntervalSeconds; - result = 31 * result + (int) (maxDurationSeconds ^ (maxDurationSeconds >>> 32)); - return result; - } -} diff --git a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java deleted file mode 100644 index f2b3bda48ad..00000000000 --- a/extensions-contrib/rabbitmq/src/main/java/org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.firehose.rabbitmq; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConsumerCancelledException; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.QueueingConsumer.Delivery; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownSignalException; -import net.jodah.lyra.ConnectionOptions; -import net.jodah.lyra.Connections; -import net.jodah.lyra.config.Config; -import net.jodah.lyra.retry.RetryPolicy; -import net.jodah.lyra.util.Duration; -import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.java.util.common.logger.Logger; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * A FirehoseFactory for RabbitMQ. - *

- * It will receive it's configuration through the realtime.spec file and expects to find a - * consumerProps element in the firehose definition with values for a number of configuration options. - * Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options - * that have defaults can be skipped but options with no defaults must be specified with the exception - * of the URI property. If the URI property is set, it will override any other property that was also - * set. - *

- * File: realtime.spec - *

- *   "firehose" : {
- *     "type" : "rabbitmq",
- *     "connection" : {
- *       "host": "localhost",                 # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
- *       "port": "5672",                      # The port number to connect to on the RabbitMQ broker. Default: '5672'
- *       "username": "test-dude",             # The username to use to connect to RabbitMQ. Default: 'guest'
- *       "password": "test-word",             # The password to use to connect to RabbitMQ. Default: 'guest'
- *       "virtualHost": "test-vhost",         # The virtual host to connect to. Default: '/'
- *       "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
- *     },
- *     "config" : {
- *       "exchange": "test-exchange",         # The exchange to connect to. No default
- *       "queue" : "druidtest",               # The queue to connect to or create. No default
- *       "routingKey": "#",                   # The routing key to use to bind the queue to the exchange. No default
- *       "durable": "true",                   # Whether the queue should be durable. Default: 'false'
- *       "exclusive": "false",                # Whether the queue should be exclusive. Default: 'false'
- *       "autoDelete": "false",               # Whether the queue should auto-delete on disconnect. Default: 'false'
- *
- *       "maxRetries": "10",                  # The max number of reconnection retry attempts
- *       "retryIntervalSeconds": "1",         # The reconnection interval
- *       "maxDurationSeconds": "300"          # The max duration of trying to reconnect
- *     },
- *     "parser" : {
- *       "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
- *       "data" : { "format" : "json" },
- *       "dimensionExclusions" : ["wp"]
- *     }
- *   },
- * 
- *

- * Limitations: This implementation will not attempt to reconnect to the MQ broker if the - * connection to it is lost. Furthermore it does not support any automatic failover on high availability - * RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior - * could be "faked" to some extent we haven't implemented that yet. However, if a policy is defined in - * the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this - * Firehose connects to, messages should survive an MQ broker node failure and be delivered once a - * connection to another node is set up. - *

- * For more information on RabbitMQ high availability please see: - * http://www.rabbitmq.com/ha.html. - */ -public class RabbitMQFirehoseFactory implements FirehoseFactory> -{ - private static final Logger log = new Logger(RabbitMQFirehoseFactory.class); - - private final RabbitMQFirehoseConfig config; - private final JacksonifiedConnectionFactory connectionFactory; - - @JsonCreator - public RabbitMQFirehoseFactory( - @JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory, - @JsonProperty("config") RabbitMQFirehoseConfig config, - // See https://github.com/apache/incubator-druid/pull/1922 - @JsonProperty("connectionFactory") JacksonifiedConnectionFactory connectionFactoryCOMPAT - ) throws Exception - { - this.connectionFactory = connectionFactory == null - ? connectionFactoryCOMPAT == null - ? JacksonifiedConnectionFactory.makeDefaultConnectionFactory() - : connectionFactoryCOMPAT - : connectionFactory; - this.config = config == null ? RabbitMQFirehoseConfig.makeDefaultConfig() : config; - - } - - @JsonProperty - public RabbitMQFirehoseConfig getConfig() - { - return config; - } - - @JsonProperty("connection") - public JacksonifiedConnectionFactory getConnectionFactory() - { - return connectionFactory; - } - - @Override - public Firehose connect(final InputRowParser firehoseParser, File temporaryDirectory) throws IOException - { - ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory); - Config lyraConfig = new Config() - .withRecoveryPolicy( - new RetryPolicy() - .withMaxRetries(config.getMaxRetries()) - .withRetryInterval(Duration.seconds(config.getRetryIntervalSeconds())) - .withMaxDuration(Duration.seconds(config.getMaxDurationSeconds())) - ); - - String queue = config.getQueue(); - String exchange = config.getExchange(); - String routingKey = config.getRoutingKey(); - - boolean durable = config.isDurable(); - boolean exclusive = config.isExclusive(); - boolean autoDelete = config.isAutoDelete(); - - final Connection connection = Connections.create(lyraOptions, lyraConfig); - - connection.addShutdownListener( - new ShutdownListener() - { - @Override - public void shutdownCompleted(ShutdownSignalException cause) - { - log.warn(cause, "Connection closed!"); - } - } - ); - - final Channel channel = connection.createChannel(); - channel.queueDeclare(queue, durable, exclusive, autoDelete, null); - channel.queueBind(queue, exchange, routingKey); - channel.addShutdownListener( - new ShutdownListener() - { - @Override - public void shutdownCompleted(ShutdownSignalException cause) - { - log.warn(cause, "Channel closed!"); - } - } - ); - - // We create a QueueingConsumer that will not auto-acknowledge messages since that - // happens on commit(). - final QueueingConsumer consumer = new QueueingConsumer(channel); - channel.basicConsume(queue, false, consumer); - - return new Firehose() - { - /** - * Storing the latest row as a member variable should be safe since this will only be run - * by a single thread. - */ - private InputRow nextRow; - - /** - * Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to - * and including this tag. See commit() for more detail. - */ - private long lastDeliveryTag; - - private Iterator nextIterator = Collections.emptyIterator(); - - @Override - public boolean hasMore() - { - nextRow = null; - try { - if (nextIterator.hasNext()) { - nextRow = nextIterator.next(); - return true; - } - // Wait for the next delivery. This will block until something is available. - final Delivery delivery = consumer.nextDelivery(); - if (delivery != null) { - lastDeliveryTag = delivery.getEnvelope().getDeliveryTag(); - nextIterator = firehoseParser.parseBatch(ByteBuffer.wrap(delivery.getBody())).iterator(); - if (nextIterator.hasNext()) { - nextRow = nextIterator.next(); - // If delivery is non-null, we report that there is something more to process. - return true; - } - } - } - catch (InterruptedException e) { - // A little unclear on how we should handle this. - - // At any rate, we're in an unknown state now so let's log something and return false. - log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen."); - } - - // This means that delivery is null or we caught the exception above so we report that we have - // nothing more to process. - return false; - } - - @Nullable - @Override - public InputRow nextRow() - { - if (nextRow == null) { - //Just making sure. - log.wtf("I have nothing in delivery. Method hasMore() should have returned false."); - return null; - } - - return nextRow; - } - - @Override - public Runnable commit() - { - // This method will be called from the same thread that calls the other methods of - // this Firehose. However, the returned Runnable will be called by a different thread. - // - // It should be (thread) safe to copy the lastDeliveryTag like we do below and then - // acknowledge values up to and including that value. - return new Runnable() - { - // Store (copy) the last delivery tag to "become" thread safe. - final long deliveryTag = lastDeliveryTag; - - @Override - public void run() - { - try { - log.info("Acknowledging delivery of messages up to tag: " + deliveryTag); - - // Acknowledge all messages up to and including the stored delivery tag. - channel.basicAck(deliveryTag, true); - } - catch (IOException e) { - log.error(e, "Unable to acknowledge message reception to message queue."); - } - } - }; - } - - @Override - public void close() throws IOException - { - log.info("Closing connection to RabbitMQ"); - channel.close(); - connection.close(); - } - }; - } - - private static class QueueingConsumer extends DefaultConsumer - { - private final BlockingQueue _queue; - - public QueueingConsumer(Channel ch) - { - this(ch, new LinkedBlockingQueue()); - } - - public QueueingConsumer(Channel ch, BlockingQueue q) - { - super(ch); - this._queue = q; - } - - @Override - public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) - { - _queue.clear(); - } - - @Override - public void handleCancel(String consumerTag) - { - _queue.clear(); - } - - @Override - public void handleDelivery( - String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body - ) - { - this._queue.add(new Delivery(envelope, properties, body)); - } - - public Delivery nextDelivery() - throws InterruptedException, ShutdownSignalException, ConsumerCancelledException - { - return _queue.take(); - } - } -} diff --git a/extensions-contrib/rabbitmq/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/rabbitmq/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule deleted file mode 100644 index 3161f969acc..00000000000 --- a/extensions-contrib/rabbitmq/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.druid.firehose.rabbitmq.RabbitMQDruidModule diff --git a/extensions-contrib/rabbitmq/src/test/java/org/apache/druid/examples/rabbitmq/RabbitMQFirehoseFactoryTest.java b/extensions-contrib/rabbitmq/src/test/java/org/apache/druid/examples/rabbitmq/RabbitMQFirehoseFactoryTest.java deleted file mode 100644 index e9c07918f30..00000000000 --- a/extensions-contrib/rabbitmq/src/test/java/org/apache/druid/examples/rabbitmq/RabbitMQFirehoseFactoryTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.rabbitmq; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import com.rabbitmq.client.ConnectionFactory; -import org.apache.druid.firehose.rabbitmq.JacksonifiedConnectionFactory; -import org.apache.druid.firehose.rabbitmq.RabbitMQFirehoseConfig; -import org.apache.druid.firehose.rabbitmq.RabbitMQFirehoseFactory; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -/** - */ -public class RabbitMQFirehoseFactoryTest -{ - private static final ObjectMapper mapper = new DefaultObjectMapper(); - - @Test - public void testSerde() throws Exception - { - RabbitMQFirehoseConfig config = new RabbitMQFirehoseConfig( - "test", - "test2", - "test3", - true, - true, - true, - 5, - 10, - 20 - ); - - JacksonifiedConnectionFactory connectionFactory = new JacksonifiedConnectionFactory( - "foo", - 9978, - "user", - "pw", - "host", - null, - 5, - 10, - 11, - 12, - ImmutableMap.of("hi", "bye") - ); - - RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory( - connectionFactory, - config, - null - ); - - byte[] bytes = mapper.writeValueAsBytes(factory); - RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class); - byte[] bytes2 = mapper.writeValueAsBytes(factory2); - - Assert.assertArrayEquals(bytes, bytes2); - - Assert.assertEquals(factory.getConfig(), factory2.getConfig()); - Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory()); - } - - @Test - public void testDefaultSerde() throws Exception - { - RabbitMQFirehoseConfig config = RabbitMQFirehoseConfig.makeDefaultConfig(); - - JacksonifiedConnectionFactory connectionFactory = JacksonifiedConnectionFactory.makeDefaultConnectionFactory(); - - RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory( - connectionFactory, - config, - null - ); - - byte[] bytes = mapper.writeValueAsBytes(factory); - RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class); - byte[] bytes2 = mapper.writeValueAsBytes(factory2); - - Assert.assertArrayEquals(bytes, bytes2); - - Assert.assertEquals(factory.getConfig(), factory2.getConfig()); - Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory()); - - Assert.assertEquals(300, factory2.getConfig().getMaxDurationSeconds()); - - Assert.assertEquals(ConnectionFactory.DEFAULT_HOST, factory2.getConnectionFactory().getHost()); - Assert.assertEquals(ConnectionFactory.DEFAULT_USER, factory2.getConnectionFactory().getUsername()); - Assert.assertEquals(ConnectionFactory.DEFAULT_AMQP_PORT, factory2.getConnectionFactory().getPort()); - } -} diff --git a/extensions-contrib/rabbitmq/src/test/java/org/apache/druid/examples/rabbitmq/RabbitMQProducerMain.java b/extensions-contrib/rabbitmq/src/test/java/org/apache/druid/examples/rabbitmq/RabbitMQProducerMain.java deleted file mode 100644 index 78c4c220995..00000000000 --- a/extensions-contrib/rabbitmq/src/test/java/org/apache/druid/examples/rabbitmq/RabbitMQProducerMain.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.examples.rabbitmq; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.druid.java.util.common.StringUtils; - -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.Locale; -import java.util.Random; -import java.util.TimeZone; -import java.util.concurrent.ThreadLocalRandom; - -/** - * - */ -public class RabbitMQProducerMain -{ - public static void main(String[] args) - throws Exception - { - // We use a List to keep track of option insertion order. See below. - final List