Merge pull request #312 from activitystream/rabbitmq-module

Moving the RabbitMQ firehose and friends to an extension.
This commit is contained in:
fjy 2013-11-27 13:01:56 -08:00
commit f403d3da07
12 changed files with 170 additions and 41 deletions

View File

@ -26,7 +26,11 @@
"routingKey": "#", "routingKey": "#",
"durable": "true", "durable": "true",
"exclusive": "false", "exclusive": "false",
"autoDelete": "false" "autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
}, },
"parser" : { "parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" }, "timestampSpec" : { "column" : "utcdt", "format" : "iso" },

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.24","io.druid.extensions:druid-kafka-seven:0.6.24"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.24","io.druid.extensions:druid-kafka-seven:0.6.24","io.druid.extensions:druid-rabbitmq:0.6.24"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -57,6 +57,7 @@
<module>s3-extensions</module> <module>s3-extensions</module>
<module>kafka-seven</module> <module>kafka-seven</module>
<module>kafka-eight</module> <module>kafka-eight</module>
<module>rabbitmq</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
@ -368,11 +369,6 @@
<artifactId>aether-api</artifactId> <artifactId>aether-api</artifactId>
<version>0.9.0.M2</version> <version>0.9.0.M2</version>
</dependency> </dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency> <dependency>
<groupId>net.java.dev.jets3t</groupId> <groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId> <artifactId>jets3t</artifactId>

45
rabbitmq/pom.xml Normal file
View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-rabbitmq</artifactId>
<name>druid-rabbitmq</name>
<description>druid-rabbitmq</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.24-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>lyra</artifactId>
<version>0.3.1</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.segment.realtime.firehose; package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
/**
*/
public class RabbitMQDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("RabbitMQFirehoseModule")
.registerSubtypes(
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.segment.realtime.firehose; package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -33,6 +33,11 @@ public class RabbitMQFirehoseConfig
private boolean exclusive = false; private boolean exclusive = false;
private boolean autoDelete = false; private boolean autoDelete = false;
// Lyra (auto reconnect) properties
private int maxRetries = 100;
private int retryIntervalSeconds = 2;
private long maxDurationSeconds = 5 * 60;
@JsonProperty @JsonProperty
public String getQueue() public String getQueue()
{ {
@ -98,4 +103,31 @@ public class RabbitMQFirehoseConfig
{ {
this.autoDelete = autoDelete; this.autoDelete = autoDelete;
} }
@JsonProperty
public int getMaxRetries() {
return maxRetries;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
@JsonProperty
public int getRetryIntervalSeconds() {
return retryIntervalSeconds;
}
public void setRetryIntervalSeconds(int retryIntervalSeconds) {
this.retryIntervalSeconds = retryIntervalSeconds;
}
@JsonProperty
public long getMaxDurationSeconds() {
return maxDurationSeconds;
}
public void setMaxDurationSeconds(int maxDurationSeconds) {
this.maxDurationSeconds = maxDurationSeconds;
}
} }

View File

@ -17,11 +17,10 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.segment.realtime.firehose; package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
@ -33,6 +32,11 @@ import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration;
import java.io.IOException; import java.io.IOException;
@ -51,21 +55,25 @@ import java.io.IOException;
* "firehose" : { * "firehose" : {
* "type" : "rabbitmq", * "type" : "rabbitmq",
* "connection" : { * "connection" : {
* "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost' * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
* "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672' * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
* "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest' * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
* "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest' * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
* "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/' * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
* "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
* }, * },
* "config" : { * "config" : {
* "exchange": "test-exchange", # The exchange to connect to. No default * "exchange": "test-exchange", # The exchange to connect to. No default
* "queue" : "druidtest", # The queue to connect to or create. No default * "queue" : "druidtest", # The queue to connect to or create. No default
* "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
* "durable": "true", # Whether the queue should be durable. Default: 'false' * "durable": "true", # Whether the queue should be durable. Default: 'false'
* "exclusive": "false", # Whether the queue should be exclusive. Default: 'false' * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
* "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false' * "autoDelete": "false", # Whether the queue should auto-delete on disconnect. Default: 'false'
* }, *
* "maxRetries": "10", # The max number of reconnection retry attempts
* "retryIntervalSeconds": "1", # The reconnection interval
* "maxDurationSeconds": "300" # The max duration of trying to reconnect
* },
* "parser" : { * "parser" : {
* "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, * "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
* "data" : { "format" : "json" }, * "data" : { "format" : "json" },
@ -113,6 +121,13 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
@Override @Override
public Firehose connect() throws IOException public Firehose connect() throws IOException
{ {
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
Config lyraConfig = new Config()
.withRecoveryPolicy(new RetryPolicy()
.withMaxRetries(config.getMaxRetries())
.withRetryInterval(Duration.seconds(config.getRetryIntervalSeconds()))
.withMaxDuration(Duration.seconds(config.getMaxDurationSeconds())));
String queue = config.getQueue(); String queue = config.getQueue();
String exchange = config.getExchange(); String exchange = config.getExchange();
String routingKey = config.getRoutingKey(); String routingKey = config.getRoutingKey();
@ -121,13 +136,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
boolean exclusive = config.isExclusive(); boolean exclusive = config.isExclusive();
boolean autoDelete = config.isAutoDelete(); boolean autoDelete = config.isAutoDelete();
final Connection connection; final Connection connection = Connections.create(lyraOptions, lyraConfig);
try {
connection = connectionFactory.newConnection();
} catch (Exception e) {
log.error("Unable to find a RabbitMQ broker. Are you sure you have one running?");
throw Throwables.propagate(e);
}
connection.addShutdownListener(new ShutdownListener() connection.addShutdownListener(new ShutdownListener()
{ {
@ -135,7 +144,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause) public void shutdownCompleted(ShutdownSignalException cause)
{ {
log.warn(cause, "Connection closed!"); log.warn(cause, "Connection closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
} }
}); });
@ -148,7 +156,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause) public void shutdownCompleted(ShutdownSignalException cause)
{ {
log.warn(cause, "Channel closed!"); log.warn(cause, "Channel closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
} }
}); });

View File

@ -0,0 +1 @@
io.druid.firehose.rabbitmq.RabbitMQDruidModule

View File

@ -183,10 +183,6 @@
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId> <artifactId>jetty-servlets</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.ircclouds.irc</groupId> <groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId> <artifactId>irc-api</artifactId>

View File

@ -28,7 +28,6 @@ import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import java.util.Arrays; import java.util.Arrays;
@ -49,7 +48,6 @@ public class FirehoseModule implements DruidModule
return Arrays.<Module>asList( return Arrays.<Module>asList(
new SimpleModule("FirehoseModule") new SimpleModule("FirehoseModule")
.registerSubtypes( .registerSubtypes(
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"), new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(IrcFirehoseFactory.class, "irc"),