Merge pull request #1589 from druid-io/fix-firehose-doc

Add a lot more docs for firehoses
This commit is contained in:
Charles Allen 2015-08-06 12:45:24 -07:00
commit e6226968a6
4 changed files with 204 additions and 41 deletions

View File

@ -19,14 +19,53 @@ Available Firehoses
There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.
#### KafkaFirehose
#### KafkaEightFirehose
This firehose acts as a Kafka consumer and ingests data from Kafka.
Please note that the [druid-kafka-eight module](../operations/including-extensions.html) is required for this firehose. This firehose acts as a Kafka 0.8.x consumer and ingests data from Kafka.
Sample spec:
```json
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "kafka-0.8"|yes|
|consumerProps|The full list of consumer configs can be [here](https://kafka.apache.org/08/configuration.html).|yes|
|feed|Kafka maintains feeds of messages in categories called topics. This is the topic name.|yes|
#### StaticS3Firehose
This firehose ingests events from a predefined list of S3 objects.
Sample spec:
```json
"firehose" : {
"type" : "static-s3",
"uris": ["s3://foo/bar/file.gz", "s3://bar/foo/file2.gz"]
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "static-s3"|N/A|yes|
|uris|JSON array of URIs where s3 files to be ingested are located.|N/A|yes|
#### StaticAzureBlobStoreFirehose
This firehose ingests events, similar to the StaticS3Firehose, but from an Azure Blob Store.
@ -38,6 +77,7 @@ The storage account is shared with the one used for Azure deep storage functiona
As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
Sample spec:
```json
"firehose" : {
"type" : "static-azure-blobstore",
@ -54,54 +94,74 @@ Sample spec:
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "static-azure-blobstore".|N/A|yes|
|blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes|
Azure Blobs:
|property|description|default|required?|
|--------|-----------|-------|---------|
|container|Name of the azure [container](https://azure.microsoft.com/en-us/documentation/articles/storage-dotnet-how-to-use-blobs/#create-a-container)|N/A|yes|
|path|The path where data is located.|N/A|yes|
#### TwitterSpritzerFirehose
See [Examples](../tutorials/examples.html). This firehose connects directly to the twitter spritzer data stream.
This firehose connects directly to the twitter spritzer data stream.
#### RandomFirehose
Sample spec:
See [Examples](../tutorials/examples.html). This firehose creates a stream of random numbers.
```json
"firehose" : {
"type" : "twitzer",
"maxEventCount": -1,
"maxRunMinutes": 0
}
```
#### RabbitMqFirehose
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "twitzer"|N/A|yes|
|maxEventCount|max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent infinite space consumption or to prevent getting throttled at an inconvenient time.|N/A|yes|
|maxRunMinutes|maximum number of minutes to fetch Twitter events. Use this to prevent getting throttled at an inconvenient time. If zero or less, no time limit for run.|N/A|yes|
#### RabbitMQFirehose
This firehose ingests events from a define rabbit-mq queue.
<br>
**Note:** Add **amqp-client-3.2.1.jar** to lib directory of druid to use this firehose.
<br>
A sample spec for rabbitmq firehose:
```json
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"port": "5672",
"username": "test-dude",
"password": "test-word",
"virtualHost": "test-vhost",
"uri": "amqp://mqserver:1234/vhost",
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"port": "5672",
"username": "test-dude",
"password": "test-word",
"virtualHost": "test-vhost",
"uri": "amqp://mqserver:1234/vhost"
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
}
}
```
|property|description|Default|required?|
|--------|-----------|---------|
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "rabbitmq"|N/A|yes|
|host|The hostname of the RabbitMQ broker to connect to|localhost|no|
|port|The port number to connect to on the RabbitMQ broker|5672|no|
|username|The username to use to connect to RabbitMQ|guest|no|
@ -117,6 +177,7 @@ A sample spec for rabbitmq firehose:
|maxRetries|The max number of reconnection retry attempts| |yes|
|retryIntervalSeconds|The reconnection interval| |yes|
|maxDurationSeconds|The max duration of trying to reconnect| |yes|
#### LocalFirehose
This Firehose can be used to read the data from files on local disk.
@ -153,7 +214,7 @@ A sample ingest firehose spec is shown below -
|property|description|required?|
|--------|-----------|---------|
|type|ingestSegment. Type of firehose|yes|
|type|This should be "ingestSegment".|yes|
|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
@ -161,6 +222,7 @@ A sample ingest firehose spec is shown below -
|filter| See [Filters](../querying/filters.html)|yes|
#### CombiningFirehose
This firehose can be used to combine and merge data from a list of different firehoses.
This can be used to merge data from more than one firehoses.
@ -173,11 +235,12 @@ This can be used to merge data from more than one firehoses.
|property|description|required?|
|--------|-----------|---------|
|type|combining|yes|
|type|This should be "combining"|yes|
|delegates|list of firehoses to combine data from|yes|
#### EventReceiverFirehose
EventReceiverFirehoseFactory can be used to ingest events using an http endpoint.
```json
@ -187,11 +250,12 @@ EventReceiverFirehoseFactory can be used to ingest events using an http endpoint
"bufferSize": 10000
}
```
When using this firehose, events can be sent by submitting a POST request to the http endpoint -
When using this firehose, events can be sent by submitting a POST request to the http endpoint:
`http://<peonHost>:<port>/druid/worker/v1/chat/<eventReceiverServiceName>/push-events/`
|property|description|required?|
|--------|-----------|---------|
|type|receiver|yes|
|type|This should be "receiver"|yes|
|serviceName|name used to announce the event receiver service endpoint|yes|
|bufferSize| size of buffer used by firehose to store events|no default(100000)|

View File

@ -16,7 +16,8 @@
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
@ -71,6 +72,12 @@
</dependency>
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -144,4 +144,26 @@ public class StaticS3FirehoseFactory implements FirehoseFactory<StringInputRowPa
firehoseParser
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StaticS3FirehoseFactory factory = (StaticS3FirehoseFactory) o;
return !(uris != null ? !uris.equals(factory.uris) : factory.uris != null);
}
@Override
public int hashCode()
{
return uris != null ? uris.hashCode() : 0;
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.s3;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
/**
*/
public class StaticS3FirehoseFactoryTest
{
@Test
public void testSerde() throws Exception
{
ObjectMapper mapper = new DefaultObjectMapper();
final List<URI> uris = Arrays.asList(
new URI("s3://foo/bar/file.gz"),
new URI("s3://bar/foo/file2.gz")
);
TestStaticS3FirehoseFactory factory = new TestStaticS3FirehoseFactory(
uris
);
TestStaticS3FirehoseFactory outputFact = mapper.readValue(
mapper.writeValueAsString(factory),
TestStaticS3FirehoseFactory.class
);
Assert.assertEquals(factory, outputFact);
}
// This class is a workaround for the injectable value that StaticS3FirehoseFactory requires
private static class TestStaticS3FirehoseFactory extends StaticS3FirehoseFactory
{
@JsonCreator
public TestStaticS3FirehoseFactory(
@JsonProperty("uris") List<URI> uris
)
{
super(null, uris);
}
}
}