From a4978bc67e6bace384bd9cc8620aa83b82e0d6b4 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 22 Feb 2011 20:06:43 +0200 Subject: [PATCH] Support for other exchange types and options in AMQP river, closes #709. --- .../org/elasticsearch/river/rabbitmq/RabbitmqRiver.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/river/rabbitmq/src/main/java/org/elasticsearch/river/rabbitmq/RabbitmqRiver.java b/plugins/river/rabbitmq/src/main/java/org/elasticsearch/river/rabbitmq/RabbitmqRiver.java index 99c403ceddd..c43133c1be3 100644 --- a/plugins/river/rabbitmq/src/main/java/org/elasticsearch/river/rabbitmq/RabbitmqRiver.java +++ b/plugins/river/rabbitmq/src/main/java/org/elasticsearch/river/rabbitmq/RabbitmqRiver.java @@ -90,9 +90,9 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River { rabbitExchange = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange"), "elasticsearch"); rabbitExchangeType = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange_type"), "direct"); rabbitRoutingKey = XContentMapValues.nodeStringValue(rabbitSettings.get("routing_key"), "elasticsearch"); - rabbitExchangeDurable = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("exchange_durable"), "true")).booleanValue(); - rabbitQueueDurable = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("queue_durable"), "true")).booleanValue(); - rabbitQueueAutoDelete = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("queue_auto_delete"), "false")).booleanValue(); + rabbitExchangeDurable = XContentMapValues.nodeBooleanValue(rabbitSettings.get("exchange_durable"), true); + rabbitQueueDurable = XContentMapValues.nodeBooleanValue(rabbitSettings.get("queue_durable"), true); + rabbitQueueAutoDelete = XContentMapValues.nodeBooleanValue(rabbitSettings.get("queue_auto_delete"), false); } else { rabbitHost = ConnectionFactory.DEFAULT_HOST; rabbitPort = ConnectionFactory.DEFAULT_AMQP_PORT; @@ -179,7 +179,7 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River { QueueingConsumer consumer = new QueueingConsumer(channel); // define the queue try { - channel.exchangeDeclare(rabbitExchange/*exchange*/, rabbitExchangeType/*type*/, true /*durable*/); + channel.exchangeDeclare(rabbitExchange/*exchange*/, rabbitExchangeType/*type*/, rabbitExchangeDurable); channel.queueDeclare(rabbitQueue/*queue*/, rabbitQueueDurable/*durable*/, false/*exclusive*/, rabbitQueueAutoDelete/*autoDelete*/, null); channel.queueBind(rabbitQueue/*queue*/, rabbitExchange/*exchange*/, rabbitRoutingKey/*routingKey*/); channel.basicConsume(rabbitQueue/*queue*/, false/*noAck*/, consumer);