Support for other exchange types and options in AMQP river, closes #709.

This commit is contained in:
kimchy 2011-02-22 20:06:43 +02:00
parent d0780f0f62
commit a4978bc67e
1 changed files with 4 additions and 4 deletions

View File

@ -90,9 +90,9 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
rabbitExchange = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange"), "elasticsearch");
rabbitExchangeType = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange_type"), "direct");
rabbitRoutingKey = XContentMapValues.nodeStringValue(rabbitSettings.get("routing_key"), "elasticsearch");
rabbitExchangeDurable = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("exchange_durable"), "true")).booleanValue();
rabbitQueueDurable = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("queue_durable"), "true")).booleanValue();
rabbitQueueAutoDelete = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("queue_auto_delete"), "false")).booleanValue();
rabbitExchangeDurable = XContentMapValues.nodeBooleanValue(rabbitSettings.get("exchange_durable"), true);
rabbitQueueDurable = XContentMapValues.nodeBooleanValue(rabbitSettings.get("queue_durable"), true);
rabbitQueueAutoDelete = XContentMapValues.nodeBooleanValue(rabbitSettings.get("queue_auto_delete"), false);
} else {
rabbitHost = ConnectionFactory.DEFAULT_HOST;
rabbitPort = ConnectionFactory.DEFAULT_AMQP_PORT;
@ -179,7 +179,7 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
QueueingConsumer consumer = new QueueingConsumer(channel);
// define the queue
try {
channel.exchangeDeclare(rabbitExchange/*exchange*/, rabbitExchangeType/*type*/, true /*durable*/);
channel.exchangeDeclare(rabbitExchange/*exchange*/, rabbitExchangeType/*type*/, rabbitExchangeDurable);
channel.queueDeclare(rabbitQueue/*queue*/, rabbitQueueDurable/*durable*/, false/*exclusive*/, rabbitQueueAutoDelete/*autoDelete*/, null);
channel.queueBind(rabbitQueue/*queue*/, rabbitExchange/*exchange*/, rabbitRoutingKey/*routingKey*/);
channel.basicConsume(rabbitQueue/*queue*/, false/*noAck*/, consumer);