mirror of
https://github.com/apache/druid.git
synced 2025-02-20 00:47:40 +00:00
Merge pull request #192 from activitystream/rabbitmq
Adding a RabbitMQFirehoseFactory implementation
This commit is contained in:
commit
5f01c633bd
12
examples/bin/examples/rabbitmq/query.body
Normal file
12
examples/bin/examples/rabbitmq/query.body
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
{
|
||||||
|
"queryType": "groupBy",
|
||||||
|
"dataSource": "rabbitmqtest",
|
||||||
|
"granularity": "all",
|
||||||
|
"dimensions": [],
|
||||||
|
"aggregations": [
|
||||||
|
{ "type": "count", "name": "rows" },
|
||||||
|
{"type": "longSum", "name": "imps", "fieldName": "impressions"},
|
||||||
|
{"type": "doubleSum", "name": "wp", "fieldName": "wp"}
|
||||||
|
],
|
||||||
|
"intervals": ["2010-01-01T00:00/2020-01-01T00:00"]
|
||||||
|
}
|
44
examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
Normal file
44
examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
[{
|
||||||
|
"schema" : {
|
||||||
|
"dataSource":"rabbitmqtest",
|
||||||
|
"aggregators":[
|
||||||
|
{"type":"count", "name":"impressions"},
|
||||||
|
{"type":"doubleSum","name":"wp","fieldName":"wp"}
|
||||||
|
],
|
||||||
|
"indexGranularity":"minute",
|
||||||
|
"shardSpec" : { "type": "none" }
|
||||||
|
},
|
||||||
|
"config" : {
|
||||||
|
"maxRowsInMemory" : 500000,
|
||||||
|
"intermediatePersistPeriod" : "PT1m"
|
||||||
|
},
|
||||||
|
"firehose" : {
|
||||||
|
"type" : "rabbitmq",
|
||||||
|
"connection" : {
|
||||||
|
"host": "localhost",
|
||||||
|
"username": "test-dude",
|
||||||
|
"password": "word-dude",
|
||||||
|
"virtualHost": "test-vhost"
|
||||||
|
},
|
||||||
|
"config" : {
|
||||||
|
"exchange": "test-exchange",
|
||||||
|
"queue" : "druidtest",
|
||||||
|
"routingKey": "#",
|
||||||
|
"durable": "true",
|
||||||
|
"exclusive": "false",
|
||||||
|
"autoDelete": "false"
|
||||||
|
},
|
||||||
|
"parser" : {
|
||||||
|
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
|
||||||
|
"data" : { "format" : "json" },
|
||||||
|
"dimensionExclusions" : ["wp"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"plumber" : {
|
||||||
|
"type" : "realtime",
|
||||||
|
"windowPeriod" : "PT5m",
|
||||||
|
"segmentGranularity":"hour",
|
||||||
|
"basePersistDirectory" : "/tmp/realtime/basePersist",
|
||||||
|
"rejectionPolicy": { "type": "messageTime" }
|
||||||
|
}
|
||||||
|
}]
|
@ -0,0 +1,182 @@
|
|||||||
|
package druid.examples.rabbitmq;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
import org.apache.commons.cli.*;
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class RabbitMQProducerMain
|
||||||
|
{
|
||||||
|
public static void main(String[] args)
|
||||||
|
throws Exception
|
||||||
|
{
|
||||||
|
// We use a List to keep track of option insertion order. See below.
|
||||||
|
final List<Option> optionList = new ArrayList<Option>();
|
||||||
|
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("help")
|
||||||
|
.withDescription("display this help message")
|
||||||
|
.create("h"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("hostname")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("the hostname of the AMQP broker [defaults to AMQP library default]")
|
||||||
|
.create("b"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("port")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("the port of the AMQP broker [defaults to AMQP library default]")
|
||||||
|
.create("n"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("username")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("username to connect to the AMQP broker [defaults to AMQP library default]")
|
||||||
|
.create("u"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("password")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("password to connect to the AMQP broker [defaults to AMQP library default]")
|
||||||
|
.create("p"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("vhost")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("name of virtual host on the AMQP broker [defaults to AMQP library default]")
|
||||||
|
.create("v"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("exchange")
|
||||||
|
.isRequired()
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("name of the AMQP exchange [required - no default]")
|
||||||
|
.create("e"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("key")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("the routing key to use when sending messages [default: 'default.routing.key']")
|
||||||
|
.create("k"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("type")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("the type of exchange to create [default: 'topic']")
|
||||||
|
.create("t"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("durable")
|
||||||
|
.withDescription("if set, a durable exchange will be declared [default: not set]")
|
||||||
|
.create("d"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("autodelete")
|
||||||
|
.withDescription("if set, an auto-delete exchange will be declared [default: not set]")
|
||||||
|
.create("a"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("single")
|
||||||
|
.withDescription("if set, only a single message will be sent [default: not set]")
|
||||||
|
.create("s"));
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("start")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("time to use to start sending messages from [default: 2010-01-01T00:00:00]")
|
||||||
|
.create());
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("stop")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("time to use to send messages until (format: '2013-07-18T23:45:59') [default: current time]")
|
||||||
|
.create());
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("interval")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("the interval to add to the timestamp between messages in seconds [default: 10]")
|
||||||
|
.create());
|
||||||
|
optionList.add(OptionBuilder.withLongOpt("delay")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("the delay between sending messages in milliseconds [default: 100]")
|
||||||
|
.create());
|
||||||
|
|
||||||
|
// An extremely silly hack to maintain the above order in the help formatting.
|
||||||
|
HelpFormatter formatter = new HelpFormatter();
|
||||||
|
// Add a comparator to the HelpFormatter using the ArrayList above to sort by insertion order.
|
||||||
|
formatter.setOptionComparator(new Comparator(){
|
||||||
|
@Override
|
||||||
|
public int compare(Object o1, Object o2)
|
||||||
|
{
|
||||||
|
// I know this isn't fast, but who cares! The list is short.
|
||||||
|
return optionList.indexOf(o1) - optionList.indexOf(o2);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now we can add all the options to an Options instance. This is dumb!
|
||||||
|
Options options = new Options();
|
||||||
|
for (Option option : optionList) {
|
||||||
|
options.addOption(option);
|
||||||
|
}
|
||||||
|
|
||||||
|
CommandLine cmd = null;
|
||||||
|
|
||||||
|
try{
|
||||||
|
cmd = new BasicParser().parse(options, args);
|
||||||
|
|
||||||
|
}
|
||||||
|
catch(ParseException e){
|
||||||
|
formatter.printHelp("RabbitMQProducerMain", e.getMessage(), options, null);
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(cmd.hasOption("h")) {
|
||||||
|
formatter.printHelp("RabbitMQProducerMain", options);
|
||||||
|
System.exit(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionFactory factory = new ConnectionFactory();
|
||||||
|
|
||||||
|
if(cmd.hasOption("b")){
|
||||||
|
factory.setHost(cmd.getOptionValue("b"));
|
||||||
|
}
|
||||||
|
if(cmd.hasOption("u")){
|
||||||
|
factory.setUsername(cmd.getOptionValue("u"));
|
||||||
|
}
|
||||||
|
if(cmd.hasOption("p")){
|
||||||
|
factory.setPassword(cmd.getOptionValue("p"));
|
||||||
|
}
|
||||||
|
if(cmd.hasOption("v")){
|
||||||
|
factory.setVirtualHost(cmd.getOptionValue("v"));
|
||||||
|
}
|
||||||
|
if(cmd.hasOption("n")){
|
||||||
|
factory.setPort(Integer.parseInt(cmd.getOptionValue("n")));
|
||||||
|
}
|
||||||
|
|
||||||
|
String exchange = cmd.getOptionValue("e");
|
||||||
|
String routingKey = "default.routing.key";
|
||||||
|
if(cmd.hasOption("k")){
|
||||||
|
routingKey = cmd.getOptionValue("k");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean durable = cmd.hasOption("d");
|
||||||
|
boolean autoDelete = cmd.hasOption("a");
|
||||||
|
String type = cmd.getOptionValue("t", "topic");
|
||||||
|
boolean single = cmd.hasOption("single");
|
||||||
|
int interval = Integer.parseInt(cmd.getOptionValue("interval", "10"));
|
||||||
|
int delay = Integer.parseInt(cmd.getOptionValue("delay", "100"));
|
||||||
|
|
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
|
||||||
|
Date stop = sdf.parse(cmd.getOptionValue("stop", sdf.format(new Date())));
|
||||||
|
|
||||||
|
Random r = new Random();
|
||||||
|
Calendar timer = Calendar.getInstance();
|
||||||
|
timer.setTime(sdf.parse(cmd.getOptionValue("start", "2010-01-01T00:00:00")));
|
||||||
|
|
||||||
|
String msg_template = "{\"utcdt\": \"%s\", \"wp\": %d, \"gender\": \"%s\", \"age\": %d}";
|
||||||
|
|
||||||
|
Connection connection = factory.newConnection();
|
||||||
|
Channel channel = connection.createChannel();
|
||||||
|
|
||||||
|
channel.exchangeDeclare(exchange, type, durable, autoDelete, null);
|
||||||
|
|
||||||
|
do{
|
||||||
|
int wp = (10 + r.nextInt(90)) * 100;
|
||||||
|
String gender = r.nextBoolean() ? "male" : "female";
|
||||||
|
int age = 20 + r.nextInt(70);
|
||||||
|
|
||||||
|
String line = String.format(msg_template, sdf.format(timer.getTime()), wp, gender, age);
|
||||||
|
|
||||||
|
channel.basicPublish(exchange, routingKey, null, line.getBytes());
|
||||||
|
|
||||||
|
System.out.println("Sent message: " + line);
|
||||||
|
|
||||||
|
timer.add(Calendar.SECOND, interval);
|
||||||
|
|
||||||
|
Thread.sleep(delay);
|
||||||
|
}while((!single && stop.after(timer.getTime())));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -120,6 +120,11 @@
|
|||||||
</exclusion>
|
</exclusion>
|
||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.rabbitmq</groupId>
|
||||||
|
<artifactId>amqp-client</artifactId>
|
||||||
|
<version>3.1.1</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
|
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
|||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||||
@JsonSubTypes({
|
@JsonSubTypes({
|
||||||
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
|
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
|
||||||
|
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
|
||||||
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
|
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
|
||||||
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
|
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
|
||||||
})
|
})
|
||||||
|
@ -0,0 +1,154 @@
|
|||||||
|
package com.metamx.druid.realtime.firehose;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Jacksonified version of the RabbitMQ ConnectionFactory for better integration
|
||||||
|
* into the realtime.spec configuration file format.
|
||||||
|
*/
|
||||||
|
public class JacksonifiedConnectionFactory extends ConnectionFactory
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public String getHost()
|
||||||
|
{
|
||||||
|
return super.getHost();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setHost(String host)
|
||||||
|
{
|
||||||
|
super.setHost(host);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public int getPort()
|
||||||
|
{
|
||||||
|
return super.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPort(int port)
|
||||||
|
{
|
||||||
|
super.setPort(port);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public String getUsername()
|
||||||
|
{
|
||||||
|
return super.getUsername();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUsername(String username)
|
||||||
|
{
|
||||||
|
super.setUsername(username);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public String getPassword()
|
||||||
|
{
|
||||||
|
return super.getPassword();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPassword(String password)
|
||||||
|
{
|
||||||
|
super.setPassword(password);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public String getVirtualHost()
|
||||||
|
{
|
||||||
|
return super.getVirtualHost();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setVirtualHost(String virtualHost)
|
||||||
|
{
|
||||||
|
super.setVirtualHost(virtualHost);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
|
||||||
|
{
|
||||||
|
super.setUri(uriString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public int getRequestedChannelMax()
|
||||||
|
{
|
||||||
|
return super.getRequestedChannelMax();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRequestedChannelMax(int requestedChannelMax)
|
||||||
|
{
|
||||||
|
super.setRequestedChannelMax(requestedChannelMax);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public int getRequestedFrameMax()
|
||||||
|
{
|
||||||
|
return super.getRequestedFrameMax();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRequestedFrameMax(int requestedFrameMax)
|
||||||
|
{
|
||||||
|
super.setRequestedFrameMax(requestedFrameMax);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public int getRequestedHeartbeat()
|
||||||
|
{
|
||||||
|
return super.getRequestedHeartbeat();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConnectionTimeout(int connectionTimeout)
|
||||||
|
{
|
||||||
|
super.setConnectionTimeout(connectionTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public int getConnectionTimeout()
|
||||||
|
{
|
||||||
|
return super.getConnectionTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRequestedHeartbeat(int requestedHeartbeat)
|
||||||
|
{
|
||||||
|
super.setRequestedHeartbeat(requestedHeartbeat);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public Map<String, Object> getClientProperties()
|
||||||
|
{
|
||||||
|
return super.getClientProperties();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setClientProperties(Map<String, Object> clientProperties)
|
||||||
|
{
|
||||||
|
super.setClientProperties(clientProperties);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,82 @@
|
|||||||
|
package com.metamx.druid.realtime.firehose;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A configuration object for a RabbitMQ connection.
|
||||||
|
*/
|
||||||
|
public class RabbitMQFirehoseConfig
|
||||||
|
{
|
||||||
|
private String queue = null;
|
||||||
|
private String exchange = null;
|
||||||
|
private String routingKey = null;
|
||||||
|
private boolean durable = false;
|
||||||
|
private boolean exclusive = false;
|
||||||
|
private boolean autoDelete = false;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getQueue()
|
||||||
|
{
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setQueue(String queue)
|
||||||
|
{
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getExchange()
|
||||||
|
{
|
||||||
|
return exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExchange(String exchange)
|
||||||
|
{
|
||||||
|
this.exchange = exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getRoutingKey()
|
||||||
|
{
|
||||||
|
return routingKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRoutingKey(String routingKey)
|
||||||
|
{
|
||||||
|
this.routingKey = routingKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public boolean isDurable()
|
||||||
|
{
|
||||||
|
return durable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDurable(boolean durable)
|
||||||
|
{
|
||||||
|
this.durable = durable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public boolean isExclusive()
|
||||||
|
{
|
||||||
|
return exclusive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExclusive(boolean exclusive)
|
||||||
|
{
|
||||||
|
this.exclusive = exclusive;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public boolean isAutoDelete()
|
||||||
|
{
|
||||||
|
return autoDelete;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAutoDelete(boolean autoDelete)
|
||||||
|
{
|
||||||
|
this.autoDelete = autoDelete;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,215 @@
|
|||||||
|
package com.metamx.druid.realtime.firehose;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.druid.indexer.data.StringInputRowParser;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
|
import com.rabbitmq.client.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A FirehoseFactory for RabbitMQ.
|
||||||
|
* <p/>
|
||||||
|
* It will receive it's configuration through the realtime.spec file and expects to find a
|
||||||
|
* consumerProps element in the firehose definition with values for a number of configuration options.
|
||||||
|
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
|
||||||
|
* that have defaults can be skipped but options with no defaults must be specified with the exception
|
||||||
|
* of the URI property. If the URI property is set, it will override any other property that was also
|
||||||
|
* set.
|
||||||
|
* <p/>
|
||||||
|
* File: <em>realtime.spec</em>
|
||||||
|
* <pre>
|
||||||
|
* "firehose" : {
|
||||||
|
* "type" : "rabbitmq",
|
||||||
|
* "consumerProps" : {
|
||||||
|
* "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
|
||||||
|
* "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
|
||||||
|
* "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
|
||||||
|
* "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
|
||||||
|
* "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
|
||||||
|
* "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
|
||||||
|
* "exchange": "test-exchange", # The exchange to connect to. No default
|
||||||
|
* "queue" : "druidtest", # The queue to connect to or create. No default
|
||||||
|
* "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
|
||||||
|
* "durable": "true", # Whether the queue should be durable. Default: 'false'
|
||||||
|
* "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
|
||||||
|
* "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false'
|
||||||
|
* },
|
||||||
|
* "parser" : {
|
||||||
|
* "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
|
||||||
|
* "data" : { "format" : "json" },
|
||||||
|
* "dimensionExclusions" : ["wp"]
|
||||||
|
* }
|
||||||
|
* },
|
||||||
|
* </pre>
|
||||||
|
* <p/>
|
||||||
|
* <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the
|
||||||
|
* connection to it is lost. Furthermore it does not support any automatic failover on high availability
|
||||||
|
* RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior
|
||||||
|
* could be "faked" to some extent we haven't implemented that yet. However, if a policy is defined in
|
||||||
|
* the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this
|
||||||
|
* Firehose connects to, messages should survive an MQ broker node failure and be delivered once a
|
||||||
|
* connection to another node is set up.
|
||||||
|
* <p/>
|
||||||
|
* For more information on RabbitMQ high availability please see:
|
||||||
|
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
|
||||||
|
*/
|
||||||
|
public class RabbitMQFirehoseFactory implements FirehoseFactory
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final RabbitMQFirehoseConfig config;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final StringInputRowParser parser;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final ConnectionFactory connectionFactory;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public RabbitMQFirehoseFactory(
|
||||||
|
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
|
||||||
|
@JsonProperty("config") RabbitMQFirehoseConfig config,
|
||||||
|
@JsonProperty("parser") StringInputRowParser parser
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.connectionFactory = connectionFactory;
|
||||||
|
this.config = config;
|
||||||
|
this.parser = parser;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Firehose connect() throws IOException
|
||||||
|
{
|
||||||
|
String queue = config.getQueue();
|
||||||
|
String exchange = config.getExchange();
|
||||||
|
String routingKey = config.getRoutingKey();
|
||||||
|
|
||||||
|
boolean durable = config.isDurable();
|
||||||
|
boolean exclusive = config.isExclusive();
|
||||||
|
boolean autoDelete = config.isAutoDelete();
|
||||||
|
|
||||||
|
final Connection connection = connectionFactory.newConnection();
|
||||||
|
connection.addShutdownListener(new ShutdownListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void shutdownCompleted(ShutdownSignalException cause)
|
||||||
|
{
|
||||||
|
log.warn(cause, "Connection closed!");
|
||||||
|
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final Channel channel = connection.createChannel();
|
||||||
|
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
|
||||||
|
channel.queueBind(queue, exchange, routingKey);
|
||||||
|
channel.addShutdownListener(new ShutdownListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void shutdownCompleted(ShutdownSignalException cause)
|
||||||
|
{
|
||||||
|
log.warn(cause, "Channel closed!");
|
||||||
|
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// We create a QueueingConsumer that will not auto-acknowledge messages since that
|
||||||
|
// happens on commit().
|
||||||
|
final QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||||
|
channel.basicConsume(queue, false, consumer);
|
||||||
|
|
||||||
|
return new Firehose()
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Storing the latest delivery as a member variable should be safe since this will only be run
|
||||||
|
* by a single thread.
|
||||||
|
*/
|
||||||
|
private QueueingConsumer.Delivery delivery;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
|
||||||
|
* and including this tag. See commit() for more detail.
|
||||||
|
*/
|
||||||
|
private long lastDeliveryTag;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMore()
|
||||||
|
{
|
||||||
|
delivery = null;
|
||||||
|
try {
|
||||||
|
// Wait for the next delivery. This will block until something is available.
|
||||||
|
delivery = consumer.nextDelivery();
|
||||||
|
if (delivery != null) {
|
||||||
|
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
|
||||||
|
// If delivery is non-null, we report that there is something more to process.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
// A little unclear on how we should handle this.
|
||||||
|
|
||||||
|
// At any rate, we're in an unknown state now so let's log something and return false.
|
||||||
|
log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// This means that delivery is null or we caught the exception above so we report that we have
|
||||||
|
// nothing more to process.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputRow nextRow()
|
||||||
|
{
|
||||||
|
if (delivery == null) {
|
||||||
|
//Just making sure.
|
||||||
|
log.wtf("I have nothing in delivery. Method hasMore() should have returned false.");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return parser.parse(new String(delivery.getBody()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable commit()
|
||||||
|
{
|
||||||
|
// This method will be called from the same thread that calls the other methods of
|
||||||
|
// this Firehose. However, the returned Runnable will be called by a different thread.
|
||||||
|
//
|
||||||
|
// It should be (thread) safe to copy the lastDeliveryTag like we do below and then
|
||||||
|
// acknowledge values up to and including that value.
|
||||||
|
return new Runnable()
|
||||||
|
{
|
||||||
|
// Store (copy) the last delivery tag to "become" thread safe.
|
||||||
|
final long deliveryTag = lastDeliveryTag;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
log.info("Acknowledging delivery of messages up to tag: " + deliveryTag);
|
||||||
|
|
||||||
|
// Acknowledge all messages up to and including the stored delivery tag.
|
||||||
|
channel.basicAck(deliveryTag, true);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
log.error(e, "Unable to acknowledge message reception to message queue.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
log.info("Closing connection to RabbitMQ");
|
||||||
|
channel.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user