Ugly quick'n'dirty way of getting lyra to work.

This commit is contained in:
Stefán Freyr Stefánsson 2013-11-19 10:18:00 +00:00
parent 60dbdcebac
commit f644c8ea66
5 changed files with 79 additions and 26 deletions

View File

@ -26,7 +26,11 @@
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false"
"autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },

View File

@ -371,7 +371,12 @@
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.1.1</version>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>lyra</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>

View File

@ -187,6 +187,10 @@
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>lyra</artifactId>
</dependency>
<dependency>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>

View File

@ -33,6 +33,11 @@ public class RabbitMQFirehoseConfig
private boolean exclusive = false;
private boolean autoDelete = false;
// Lyra (auto reconnect) properties
private int maxRetries = 100;
private int retryIntervalSeconds = 2;
private long maxDurationSeconds = 5 * 60;
@JsonProperty
public String getQueue()
{
@ -98,4 +103,31 @@ public class RabbitMQFirehoseConfig
{
this.autoDelete = autoDelete;
}
@JsonProperty
public int getMaxRetries() {
return maxRetries;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
@JsonProperty
public int getRetryIntervalSeconds() {
return retryIntervalSeconds;
}
public void setRetryIntervalSeconds(int retryIntervalSeconds) {
this.retryIntervalSeconds = retryIntervalSeconds;
}
@JsonProperty
public long getMaxDurationSeconds() {
return maxDurationSeconds;
}
public void setMaxDurationSeconds(int maxDurationSeconds) {
this.maxDurationSeconds = maxDurationSeconds;
}
}

View File

@ -33,6 +33,11 @@ import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration;
import java.io.IOException;
@ -51,21 +56,25 @@ import java.io.IOException;
* "firehose" : {
* "type" : "rabbitmq",
* "connection" : {
* "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
* "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
* "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
* "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
* "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
* "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
* },
* "config" : {
* "exchange": "test-exchange", # The exchange to connect to. No default
* "queue" : "druidtest", # The queue to connect to or create. No default
* "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
* "durable": "true", # Whether the queue should be durable. Default: 'false'
* "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
* "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false'
* },
* "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
* "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
* "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
* "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
* "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
* "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
* },
* "config" : {
* "exchange": "test-exchange", # The exchange to connect to. No default
* "queue" : "druidtest", # The queue to connect to or create. No default
* "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
* "durable": "true", # Whether the queue should be durable. Default: 'false'
* "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
* "autoDelete": "false", # Whether the queue should auto-delete on disconnect. Default: 'false'
*
* "maxRetries": "10", # The max number of reconnection retry attempts
* "retryIntervalSeconds": "1", # The reconnection interval
* "maxDurationSeconds": "300" # The max duration of trying to reconnect
* },
* "parser" : {
* "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
* "data" : { "format" : "json" },
@ -113,6 +122,13 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
@Override
public Firehose connect() throws IOException
{
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
Config lyraConfig = new Config()
.withRecoveryPolicy(new RetryPolicy()
.withMaxRetries(config.getMaxRetries())
.withRetryInterval(Duration.seconds(config.getRetryIntervalSeconds()))
.withMaxDuration(Duration.seconds(config.getMaxDurationSeconds())));
String queue = config.getQueue();
String exchange = config.getExchange();
String routingKey = config.getRoutingKey();
@ -121,13 +137,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
boolean exclusive = config.isExclusive();
boolean autoDelete = config.isAutoDelete();
final Connection connection;
try {
connection = connectionFactory.newConnection();
} catch (Exception e) {
log.error("Unable to find a RabbitMQ broker. Are you sure you have one running?");
throw Throwables.propagate(e);
}
final Connection connection = Connections.create(lyraOptions, lyraConfig);
connection.addShutdownListener(new ShutdownListener()
{
@ -135,7 +145,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Connection closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
@ -148,7 +157,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Channel closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});