From f644c8ea66f923653b4e18409fb45681b93fbcf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Tue, 19 Nov 2013 10:18:00 +0000 Subject: [PATCH 1/5] Ugly quick'n'dirty way of getting lyra to work. --- .../examples/rabbitmq/rabbitmq_realtime.spec | 6 +- pom.xml | 7 ++- server/pom.xml | 4 ++ .../firehose/RabbitMQFirehoseConfig.java | 32 +++++++++++ .../firehose/RabbitMQFirehoseFactory.java | 56 +++++++++++-------- 5 files changed, 79 insertions(+), 26 deletions(-) diff --git a/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec b/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec index fc26f21ee3a..528e81f39cc 100644 --- a/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec +++ b/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec @@ -26,7 +26,11 @@ "routingKey": "#", "durable": "true", "exclusive": "false", - "autoDelete": "false" + "autoDelete": "false", + + "maxRetries": "10", + "retryIntervalSeconds": "1", + "maxDurationSeconds": "300" }, "parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, diff --git a/pom.xml b/pom.xml index 24042c676ab..4ccfa0d2ab4 100644 --- a/pom.xml +++ b/pom.xml @@ -371,7 +371,12 @@ com.rabbitmq amqp-client - 3.1.1 + 3.2.1 + + + net.jodah + lyra + 0.3.1 net.java.dev.jets3t diff --git a/server/pom.xml b/server/pom.xml index 51b8e988f15..70858ce1430 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -187,6 +187,10 @@ com.rabbitmq amqp-client + + net.jodah + lyra + com.ircclouds.irc irc-api diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java index 42e10dd601b..0605142a021 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java @@ -33,6 +33,11 @@ public class RabbitMQFirehoseConfig private boolean exclusive = false; private boolean autoDelete = false; + // Lyra (auto reconnect) properties + private int maxRetries = 100; + private int retryIntervalSeconds = 2; + private long maxDurationSeconds = 5 * 60; + @JsonProperty public String getQueue() { @@ -98,4 +103,31 @@ public class RabbitMQFirehoseConfig { this.autoDelete = autoDelete; } + + @JsonProperty + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + @JsonProperty + public int getRetryIntervalSeconds() { + return retryIntervalSeconds; + } + + public void setRetryIntervalSeconds(int retryIntervalSeconds) { + this.retryIntervalSeconds = retryIntervalSeconds; + } + + @JsonProperty + public long getMaxDurationSeconds() { + return maxDurationSeconds; + } + + public void setMaxDurationSeconds(int maxDurationSeconds) { + this.maxDurationSeconds = maxDurationSeconds; + } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java index aa0270df15d..a76cde8af06 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java @@ -33,6 +33,11 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; +import net.jodah.lyra.ConnectionOptions; +import net.jodah.lyra.Connections; +import net.jodah.lyra.config.Config; +import net.jodah.lyra.retry.RetryPolicy; +import net.jodah.lyra.util.Duration; import java.io.IOException; @@ -51,21 +56,25 @@ import java.io.IOException; * "firehose" : { * "type" : "rabbitmq", * "connection" : { - * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost' - * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672' - * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest' - * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest' - * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/' - * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed - * }, - * "config" : { - * "exchange": "test-exchange", # The exchange to connect to. No default - * "queue" : "druidtest", # The queue to connect to or create. No default - * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default - * "durable": "true", # Whether the queue should be durable. Default: 'false' - * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false' - * "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false' - * }, + * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost' + * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672' + * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest' + * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest' + * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/' + * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed + * }, + * "config" : { + * "exchange": "test-exchange", # The exchange to connect to. No default + * "queue" : "druidtest", # The queue to connect to or create. No default + * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default + * "durable": "true", # Whether the queue should be durable. Default: 'false' + * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false' + * "autoDelete": "false", # Whether the queue should auto-delete on disconnect. Default: 'false' + * + * "maxRetries": "10", # The max number of reconnection retry attempts + * "retryIntervalSeconds": "1", # The reconnection interval + * "maxDurationSeconds": "300" # The max duration of trying to reconnect + * }, * "parser" : { * "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, * "data" : { "format" : "json" }, @@ -113,6 +122,13 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { + ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory); + Config lyraConfig = new Config() + .withRecoveryPolicy(new RetryPolicy() + .withMaxRetries(config.getMaxRetries()) + .withRetryInterval(Duration.seconds(config.getRetryIntervalSeconds())) + .withMaxDuration(Duration.seconds(config.getMaxDurationSeconds()))); + String queue = config.getQueue(); String exchange = config.getExchange(); String routingKey = config.getRoutingKey(); @@ -121,13 +137,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory boolean exclusive = config.isExclusive(); boolean autoDelete = config.isAutoDelete(); - final Connection connection; - try { - connection = connectionFactory.newConnection(); - } catch (Exception e) { - log.error("Unable to find a RabbitMQ broker. Are you sure you have one running?"); - throw Throwables.propagate(e); - } + final Connection connection = Connections.create(lyraOptions, lyraConfig); connection.addShutdownListener(new ShutdownListener() { @@ -135,7 +145,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory public void shutdownCompleted(ShutdownSignalException cause) { log.warn(cause, "Connection closed!"); - //FUTURE: we could try to re-establish the connection here. Not done in this version though. } }); @@ -148,7 +157,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory public void shutdownCompleted(ShutdownSignalException cause) { log.warn(cause, "Channel closed!"); - //FUTURE: we could try to re-establish the connection here. Not done in this version though. } }); From 71598ee60ecf9939f530104d46010da520fef8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Thu, 21 Nov 2013 18:41:06 +0000 Subject: [PATCH 2/5] Moving RabbitMQ stuff to a module. --- pom.xml | 11 +--- rabbitmq/pom.xml | 40 +++++++++++++++ .../JacksonifiedConnectionFactory.java | 2 +- .../rabbitmq/RabbitMQDruidModule.java | 50 +++++++++++++++++++ .../rabbitmq}/RabbitMQFirehoseConfig.java | 2 +- .../rabbitmq}/RabbitMQFirehoseFactory.java | 3 +- .../io.druid.initialization.DruidModule | 1 + .../rabbitmq/RabbitMQProducerMain.java | 0 server/pom.xml | 8 --- .../java/io/druid/guice/FirehoseModule.java | 2 - 10 files changed, 95 insertions(+), 24 deletions(-) create mode 100644 rabbitmq/pom.xml rename {server/src/main/java/io/druid/segment/realtime/firehose => rabbitmq/src/main/java/io/druid/firehose/rabbitmq}/JacksonifiedConnectionFactory.java (98%) create mode 100644 rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java rename {server/src/main/java/io/druid/segment/realtime/firehose => rabbitmq/src/main/java/io/druid/firehose/rabbitmq}/RabbitMQFirehoseConfig.java (98%) rename {server/src/main/java/io/druid/segment/realtime/firehose => rabbitmq/src/main/java/io/druid/firehose/rabbitmq}/RabbitMQFirehoseFactory.java (98%) create mode 100644 rabbitmq/src/main/resources/META-INF.services/io.druid.initialization.DruidModule rename {examples/src/main => rabbitmq/src/test}/java/io/druid/examples/rabbitmq/RabbitMQProducerMain.java (100%) diff --git a/pom.xml b/pom.xml index c89d54660fb..e87f87205f4 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ s3-extensions kafka-seven kafka-eight + rabbitmq @@ -368,16 +369,6 @@ aether-api 0.9.0.M2 - - com.rabbitmq - amqp-client - 3.2.1 - - - net.jodah - lyra - 0.3.1 - net.java.dev.jets3t jets3t diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml new file mode 100644 index 00000000000..9e3a63cbf0a --- /dev/null +++ b/rabbitmq/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + io.druid.extensions + druid-rabbitmq + druid-rabbitmq + druid-rabbitmq + + + io.druid + druid + 0.6.21-SNAPSHOT + + + + + io.druid + druid-api + + + com.rabbitmq + amqp-client + 3.2.1 + + + net.jodah + lyra + 0.3.1 + + + + + junit + junit + test + + + \ No newline at end of file diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/JacksonifiedConnectionFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java similarity index 98% rename from server/src/main/java/io/druid/segment/realtime/firehose/JacksonifiedConnectionFactory.java rename to rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java index 03f35cf48f7..132fe3b6179 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/JacksonifiedConnectionFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.segment.realtime.firehose; +package io.druid.firehose.rabbitmq; import com.fasterxml.jackson.annotation.JsonProperty; import com.rabbitmq.client.ConnectionFactory; diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java new file mode 100644 index 00000000000..548cbcc1d1a --- /dev/null +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package io.druid.firehose.rabbitmq; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +/** + */ +public class RabbitMQDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("RabbitMQFirehoseModule") + .registerSubtypes( + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq") + ) + ); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java similarity index 98% rename from server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java rename to rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java index 0605142a021..7bae291c8a3 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.segment.realtime.firehose; +package io.druid.firehose.rabbitmq; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java similarity index 98% rename from server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java rename to rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index a76cde8af06..df4d1baeda1 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -17,11 +17,10 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.segment.realtime.firehose; +package io.druid.firehose.rabbitmq; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.metamx.common.logger.Logger; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; diff --git a/rabbitmq/src/main/resources/META-INF.services/io.druid.initialization.DruidModule b/rabbitmq/src/main/resources/META-INF.services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..9ba3917f48a --- /dev/null +++ b/rabbitmq/src/main/resources/META-INF.services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.firehose.rabbitmq.RabbitMQDruidModule \ No newline at end of file diff --git a/examples/src/main/java/io/druid/examples/rabbitmq/RabbitMQProducerMain.java b/rabbitmq/src/test/java/io/druid/examples/rabbitmq/RabbitMQProducerMain.java similarity index 100% rename from examples/src/main/java/io/druid/examples/rabbitmq/RabbitMQProducerMain.java rename to rabbitmq/src/test/java/io/druid/examples/rabbitmq/RabbitMQProducerMain.java diff --git a/server/pom.xml b/server/pom.xml index 2ff2ba513c0..51bfa347755 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -183,14 +183,6 @@ org.eclipse.jetty jetty-servlets - - com.rabbitmq - amqp-client - - - net.jodah - lyra - com.ircclouds.irc irc-api diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index 8a7480fd22c..aacc217e208 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -28,7 +28,6 @@ import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import java.util.Arrays; @@ -49,7 +48,6 @@ public class FirehoseModule implements DruidModule return Arrays.asList( new SimpleModule("FirehoseModule") .registerSubtypes( - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), new NamedType(ClippedFirehoseFactory.class, "clipped"), new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), From b6ed8fcf6f31012c7c3ae75244c81ffdae7ab375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Mon, 25 Nov 2013 18:32:25 +0000 Subject: [PATCH 3/5] Moving services file from META-INF.services to META-INF/services directory. Thanks a lot IDEA! --- .../services}/io.druid.initialization.DruidModule | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename rabbitmq/src/main/resources/{META-INF.services => META-INF/services}/io.druid.initialization.DruidModule (100%) diff --git a/rabbitmq/src/main/resources/META-INF.services/io.druid.initialization.DruidModule b/rabbitmq/src/main/resources/META-INF/services/io.druid.initialization.DruidModule similarity index 100% rename from rabbitmq/src/main/resources/META-INF.services/io.druid.initialization.DruidModule rename to rabbitmq/src/main/resources/META-INF/services/io.druid.initialization.DruidModule From 833f1c5fed92e1c761cde2e3b9be0b71274f5904 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Mon, 25 Nov 2013 18:37:28 +0000 Subject: [PATCH 4/5] adding the rabbitmq extension to the examples extension coordinates. --- examples/config/realtime/runtime.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties index 6f3ce236bf5..0209246fc45 100644 --- a/examples/config/realtime/runtime.properties +++ b/examples/config/realtime/runtime.properties @@ -4,7 +4,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.23","io.druid.extensions:druid-kafka-seven:0.6.23"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.23","io.druid.extensions:druid-kafka-seven:0.6.24-SNAPSHOT","io.druid.extensions:druid-rabbitmq:0.6.24-SNAPSHOT"] druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid druid.db.connector.user=druid From f04940f8b53b3a527967e6bc2298774786c0e7e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Mon, 25 Nov 2013 22:53:19 +0000 Subject: [PATCH 5/5] unifying the extension versions (even though they don't make sense for the rabbitmq extension). --- examples/config/realtime/runtime.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties index 0209246fc45..1fabe4f4904 100644 --- a/examples/config/realtime/runtime.properties +++ b/examples/config/realtime/runtime.properties @@ -4,7 +4,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.23","io.druid.extensions:druid-kafka-seven:0.6.24-SNAPSHOT","io.druid.extensions:druid-rabbitmq:0.6.24-SNAPSHOT"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.23","io.druid.extensions:druid-kafka-seven:0.6.23","io.druid.extensions:druid-rabbitmq:0.6.23"] druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid druid.db.connector.user=druid