From 012fff66160df8bbb5684304f2518598b796ac6f Mon Sep 17 00:00:00 2001 From: fjy Date: Sun, 2 Aug 2015 10:37:07 -0700 Subject: [PATCH] fix firehose docs --- docs/content/ingestion/firehose.md | 144 +++++++++++++----- extensions/s3-extensions/pom.xml | 9 +- .../firehose/s3/StaticS3FirehoseFactory.java | 22 +++ .../s3/StaticS3FirehoseFactoryTest.java | 70 +++++++++ 4 files changed, 204 insertions(+), 41 deletions(-) create mode 100644 extensions/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index 818f15df7c2..106ee43e155 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -19,14 +19,53 @@ Available Firehoses There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment. -#### KafkaFirehose +#### KafkaEightFirehose -This firehose acts as a Kafka consumer and ingests data from Kafka. +Please note that the [druid-kafka-eight module](../operations/including-extensions.html) is required for this firehose. This firehose acts as a Kafka 0.8.x consumer and ingests data from Kafka. + +Sample spec: + +```json +"firehose": { + "type": "kafka-0.8", + "consumerProps": { + "zookeeper.connect": "localhost:2181", + "zookeeper.connection.timeout.ms" : "15000", + "zookeeper.session.timeout.ms" : "15000", + "zookeeper.sync.time.ms" : "5000", + "group.id": "druid-example", + "fetch.message.max.bytes" : "1048586", + "auto.offset.reset": "largest", + "auto.commit.enable": "false" + }, + "feed": "wikipedia" +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "kafka-0.8"|yes| +|consumerProps|The full list of consumer configs can be [here](https://kafka.apache.org/08/configuration.html).|yes| +|feed|Kafka maintains feeds of messages in categories called topics. This is the topic name.|yes| #### StaticS3Firehose This firehose ingests events from a predefined list of S3 objects. +Sample spec: + +```json +"firehose" : { + "type" : "static-s3", + "uris": ["s3://foo/bar/file.gz", "s3://bar/foo/file2.gz"] +} +``` + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be "static-s3"|N/A|yes| +|uris|JSON array of URIs where s3 files to be ingested are located.|N/A|yes| + #### StaticAzureBlobStoreFirehose This firehose ingests events, similar to the StaticS3Firehose, but from an Azure Blob Store. @@ -38,6 +77,7 @@ The storage account is shared with the one used for Azure deep storage functiona As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz Sample spec: + ```json "firehose" : { "type" : "static-azure-blobstore", @@ -54,54 +94,74 @@ Sample spec: } ``` +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be "static-azure-blobstore".|N/A|yes| +|blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes| + +Azure Blobs: + +|property|description|default|required?| +|--------|-----------|-------|---------| +|container|Name of the azure [container](https://azure.microsoft.com/en-us/documentation/articles/storage-dotnet-how-to-use-blobs/#create-a-container)|N/A|yes| +|path|The path where data is located.|N/A|yes| + #### TwitterSpritzerFirehose -See [Examples](../tutorials/examples.html). This firehose connects directly to the twitter spritzer data stream. +This firehose connects directly to the twitter spritzer data stream. -#### RandomFirehose +Sample spec: -See [Examples](../tutorials/examples.html). This firehose creates a stream of random numbers. +```json +"firehose" : { + "type" : "twitzer", + "maxEventCount": -1, + "maxRunMinutes": 0 +} +``` -#### RabbitMqFirehose +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be "twitzer"|N/A|yes| +|maxEventCount|max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent infinite space consumption or to prevent getting throttled at an inconvenient time.|N/A|yes| +|maxRunMinutes|maximum number of minutes to fetch Twitter events. Use this to prevent getting throttled at an inconvenient time. If zero or less, no time limit for run.|N/A|yes| + +#### RabbitMQFirehose This firehose ingests events from a define rabbit-mq queue. -
+ **Note:** Add **amqp-client-3.2.1.jar** to lib directory of druid to use this firehose. -
+ A sample spec for rabbitmq firehose: ```json "firehose" : { - "type" : "rabbitmq", - "connection" : { - "host": "localhost", - "port": "5672", - "username": "test-dude", - "password": "test-word", - "virtualHost": "test-vhost", - "uri": "amqp://mqserver:1234/vhost", - }, - "config" : { - "exchange": "test-exchange", - "queue" : "druidtest", - "routingKey": "#", - "durable": "true", - "exclusive": "false", - "autoDelete": "false", - - "maxRetries": "10", - "retryIntervalSeconds": "1", - "maxDurationSeconds": "300" - }, - "parser" : { - "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, - "data" : { "format" : "json" }, - "dimensionExclusions" : ["wp"] - } + "type" : "rabbitmq", + "connection" : { + "host": "localhost", + "port": "5672", + "username": "test-dude", + "password": "test-word", + "virtualHost": "test-vhost", + "uri": "amqp://mqserver:1234/vhost" + }, + "config" : { + "exchange": "test-exchange", + "queue" : "druidtest", + "routingKey": "#", + "durable": "true", + "exclusive": "false", + "autoDelete": "false", + "maxRetries": "10", + "retryIntervalSeconds": "1", + "maxDurationSeconds": "300" } +} ``` -|property|description|Default|required?| -|--------|-----------|---------| + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be "rabbitmq"|N/A|yes| |host|The hostname of the RabbitMQ broker to connect to|localhost|no| |port|The port number to connect to on the RabbitMQ broker|5672|no| |username|The username to use to connect to RabbitMQ|guest|no| @@ -117,6 +177,7 @@ A sample spec for rabbitmq firehose: |maxRetries|The max number of reconnection retry attempts| |yes| |retryIntervalSeconds|The reconnection interval| |yes| |maxDurationSeconds|The max duration of trying to reconnect| |yes| + #### LocalFirehose This Firehose can be used to read the data from files on local disk. @@ -153,7 +214,7 @@ A sample ingest firehose spec is shown below - |property|description|required?| |--------|-----------|---------| -|type|ingestSegment. Type of firehose|yes| +|type|This should be "ingestSegment".|yes| |dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes| |interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes| |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| @@ -161,6 +222,7 @@ A sample ingest firehose spec is shown below - |filter| See [Filters](../querying/filters.html)|yes| #### CombiningFirehose + This firehose can be used to combine and merge data from a list of different firehoses. This can be used to merge data from more than one firehoses. @@ -173,11 +235,12 @@ This can be used to merge data from more than one firehoses. |property|description|required?| |--------|-----------|---------| -|type|combining|yes| +|type|This should be "combining"|yes| |delegates|list of firehoses to combine data from|yes| #### EventReceiverFirehose + EventReceiverFirehoseFactory can be used to ingest events using an http endpoint. ```json @@ -187,11 +250,12 @@ EventReceiverFirehoseFactory can be used to ingest events using an http endpoint "bufferSize": 10000 } ``` -When using this firehose, events can be sent by submitting a POST request to the http endpoint - +When using this firehose, events can be sent by submitting a POST request to the http endpoint: + `http://:/druid/worker/v1/chat//push-events/` |property|description|required?| |--------|-----------|---------| -|type|receiver|yes| +|type|This should be "receiver"|yes| |serviceName|name used to announce the event receiver service endpoint|yes| |bufferSize| size of buffer used by firehose to store events|no default(100000)| diff --git a/extensions/s3-extensions/pom.xml b/extensions/s3-extensions/pom.xml index 09a08aa831c..a5b1fbd91e3 100644 --- a/extensions/s3-extensions/pom.xml +++ b/extensions/s3-extensions/pom.xml @@ -16,7 +16,8 @@ ~ limitations under the License. --> - + 4.0.0 io.druid.extensions @@ -71,6 +72,12 @@ + + io.druid + druid-server + ${project.parent.version} + test + junit junit diff --git a/extensions/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index 400fe77481e..32a75a3e84a 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -144,4 +144,26 @@ public class StaticS3FirehoseFactory implements FirehoseFactory uris = Arrays.asList( + new URI("s3://foo/bar/file.gz"), + new URI("s3://bar/foo/file2.gz") + ); + + TestStaticS3FirehoseFactory factory = new TestStaticS3FirehoseFactory( + uris + ); + + TestStaticS3FirehoseFactory outputFact = mapper.readValue( + mapper.writeValueAsString(factory), + TestStaticS3FirehoseFactory.class + ); + + Assert.assertEquals(factory, outputFact); + } + + // This class is a workaround for the injectable value that StaticS3FirehoseFactory requires + private static class TestStaticS3FirehoseFactory extends StaticS3FirehoseFactory + { + @JsonCreator + public TestStaticS3FirehoseFactory( + @JsonProperty("uris") List uris + ) + { + super(null, uris); + } + } +}