allow to set exhange and routing_key

This commit is contained in:
kimchy 2010-09-21 22:19:36 +02:00
parent b9b91db5c8
commit d0b29fe3ef
1 changed files with 8 additions and 6 deletions

View File

@ -110,18 +110,20 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
int bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
long bulkTimeout = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_timeout"), 10);
String queueName = XContentMapValues.nodeStringValue(settings.settings().get("queue"), "elasticsearch");
String queue = XContentMapValues.nodeStringValue(settings.settings().get("queue"), "elasticsearch");
String exchange = XContentMapValues.nodeStringValue(settings.settings().get("exchange"), "elasticsearch");
String routingKey = XContentMapValues.nodeStringValue(settings.settings().get("routing_key"), "elasticsearch");
QueueingConsumer consumer = new QueueingConsumer(channel);
// define the queue
try {
channel.exchangeDeclare(queueName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, queueName, queueName);
channel.basicConsume(queueName, false, consumer);
channel.exchangeDeclare(exchange/*exchange*/, "direct"/*type*/, true /*durable*/);
channel.queueDeclare(queue/*queue*/, true /*durable*/, false/*exclusive*/, false/*autoDelete*/, null);
channel.queueBind(queue/*queue*/, exchange/*exchange*/, routingKey/*routingKey*/);
channel.basicConsume(queue/*queue*/, false/*noAck*/, consumer);
} catch (Exception e) {
if (!closed) {
logger.warn("failed to create queue [{}]", e, queueName);
logger.warn("failed to create queue [{}]", e, queue);
}
cleanup(0, "failed to create queue");
continue;