diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java index a84678cff3f..932b05fa46b 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java @@ -2,14 +2,17 @@ package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Throwables; import com.metamx.common.logger.Logger; import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.input.InputRow; -import com.rabbitmq.client.*; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; import java.io.IOException; -import java.util.Properties; /** * A FirehoseFactory for RabbitMQ. @@ -25,20 +28,22 @@ import java.util.Properties; *
* "firehose" : { * "type" : "rabbitmq", - * "consumerProps" : { - * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost' - * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672' - * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest' - * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest' - * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/' - * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed - * "exchange": "test-exchange", # The exchange to connect to. No default - * "queue" : "druidtest", # The queue to connect to or create. No default - * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default - * "durable": "true", # Whether the queue should be durable. Default: 'false' - * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false' - * "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false' - * }, + * "connection" : { + * "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost' + * "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672' + * "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest' + * "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest' + * "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/' + * "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed + * }, + * "config" : { + * "exchange": "test-exchange", # The exchange to connect to. No default + * "queue" : "druidtest", # The queue to connect to or create. No default + * "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default + * "durable": "true", # Whether the queue should be durable. Default: 'false' + * "exclusive": "false", # Whether the queue should be exclusive. Default: 'false' + * "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false' + * }, * "parser" : { * "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, * "data" : { "format" : "json" },