diff --git a/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec b/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
index fc26f21ee3a..528e81f39cc 100644
--- a/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
+++ b/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
@@ -26,7 +26,11 @@
"routingKey": "#",
"durable": "true",
"exclusive": "false",
- "autoDelete": "false"
+ "autoDelete": "false",
+
+ "maxRetries": "10",
+ "retryIntervalSeconds": "1",
+ "maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
diff --git a/pom.xml b/pom.xml
index 24042c676ab..4ccfa0d2ab4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -371,7 +371,12 @@
com.rabbitmq
amqp-client
- 3.1.1
+ 3.2.1
+
+
+ net.jodah
+ lyra
+ 0.3.1
net.java.dev.jets3t
diff --git a/server/pom.xml b/server/pom.xml
index 51b8e988f15..70858ce1430 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -187,6 +187,10 @@
com.rabbitmq
amqp-client
+
+ net.jodah
+ lyra
+
com.ircclouds.irc
irc-api
diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java
index 42e10dd601b..0605142a021 100644
--- a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java
+++ b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseConfig.java
@@ -33,6 +33,11 @@ public class RabbitMQFirehoseConfig
private boolean exclusive = false;
private boolean autoDelete = false;
+ // Lyra (auto reconnect) properties
+ private int maxRetries = 100;
+ private int retryIntervalSeconds = 2;
+ private long maxDurationSeconds = 5 * 60;
+
@JsonProperty
public String getQueue()
{
@@ -98,4 +103,31 @@ public class RabbitMQFirehoseConfig
{
this.autoDelete = autoDelete;
}
+
+ @JsonProperty
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public void setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ @JsonProperty
+ public int getRetryIntervalSeconds() {
+ return retryIntervalSeconds;
+ }
+
+ public void setRetryIntervalSeconds(int retryIntervalSeconds) {
+ this.retryIntervalSeconds = retryIntervalSeconds;
+ }
+
+ @JsonProperty
+ public long getMaxDurationSeconds() {
+ return maxDurationSeconds;
+ }
+
+ public void setMaxDurationSeconds(int maxDurationSeconds) {
+ this.maxDurationSeconds = maxDurationSeconds;
+ }
}
diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java
index aa0270df15d..a76cde8af06 100644
--- a/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java
+++ b/server/src/main/java/io/druid/segment/realtime/firehose/RabbitMQFirehoseFactory.java
@@ -33,6 +33,11 @@ import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
+import net.jodah.lyra.ConnectionOptions;
+import net.jodah.lyra.Connections;
+import net.jodah.lyra.config.Config;
+import net.jodah.lyra.retry.RetryPolicy;
+import net.jodah.lyra.util.Duration;
import java.io.IOException;
@@ -51,21 +56,25 @@ import java.io.IOException;
* "firehose" : {
* "type" : "rabbitmq",
* "connection" : {
- * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
- * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
- * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
- * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
- * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
- * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
- * },
- * "config" : {
- * "exchange": "test-exchange", # The exchange to connect to. No default
- * "queue" : "druidtest", # The queue to connect to or create. No default
- * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
- * "durable": "true", # Whether the queue should be durable. Default: 'false'
- * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
- * "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false'
- * },
+ * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
+ * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
+ * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
+ * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
+ * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
+ * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
+ * },
+ * "config" : {
+ * "exchange": "test-exchange", # The exchange to connect to. No default
+ * "queue" : "druidtest", # The queue to connect to or create. No default
+ * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
+ * "durable": "true", # Whether the queue should be durable. Default: 'false'
+ * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
+ * "autoDelete": "false", # Whether the queue should auto-delete on disconnect. Default: 'false'
+ *
+ * "maxRetries": "10", # The max number of reconnection retry attempts
+ * "retryIntervalSeconds": "1", # The reconnection interval
+ * "maxDurationSeconds": "300" # The max duration of trying to reconnect
+ * },
* "parser" : {
* "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
* "data" : { "format" : "json" },
@@ -113,6 +122,13 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
@Override
public Firehose connect() throws IOException
{
+ ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
+ Config lyraConfig = new Config()
+ .withRecoveryPolicy(new RetryPolicy()
+ .withMaxRetries(config.getMaxRetries())
+ .withRetryInterval(Duration.seconds(config.getRetryIntervalSeconds()))
+ .withMaxDuration(Duration.seconds(config.getMaxDurationSeconds())));
+
String queue = config.getQueue();
String exchange = config.getExchange();
String routingKey = config.getRoutingKey();
@@ -121,13 +137,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
boolean exclusive = config.isExclusive();
boolean autoDelete = config.isAutoDelete();
- final Connection connection;
- try {
- connection = connectionFactory.newConnection();
- } catch (Exception e) {
- log.error("Unable to find a RabbitMQ broker. Are you sure you have one running?");
- throw Throwables.propagate(e);
- }
+ final Connection connection = Connections.create(lyraOptions, lyraConfig);
connection.addShutdownListener(new ShutdownListener()
{
@@ -135,7 +145,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Connection closed!");
- //FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
@@ -148,7 +157,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Channel closed!");
- //FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});