mirror of https://github.com/apache/druid.git
Making more things configurable through the realtime.spec file.
This commit is contained in:
parent
fc74333a28
commit
4d998e4c27
|
@ -18,7 +18,11 @@
|
||||||
"consumerProps" : { "username": "test-dude",
|
"consumerProps" : { "username": "test-dude",
|
||||||
"password": "test-word",
|
"password": "test-word",
|
||||||
"virtualHost": "test-vhost",
|
"virtualHost": "test-vhost",
|
||||||
"host": "localhost"
|
"host": "localhost",
|
||||||
|
"durable": "true",
|
||||||
|
"exclusive": "false",
|
||||||
|
"autoDelete": "false",
|
||||||
|
"audoAck": "false"
|
||||||
},
|
},
|
||||||
"queue" : "druidtest",
|
"queue" : "druidtest",
|
||||||
"exchange": "test-exchange",
|
"exchange": "test-exchange",
|
||||||
|
|
|
@ -61,12 +61,17 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{
|
||||||
factory.setPassword(consumerProps.getProperty("password", factory.getPassword()));
|
factory.setPassword(consumerProps.getProperty("password", factory.getPassword()));
|
||||||
factory.setVirtualHost(consumerProps.getProperty("virtualHost", factory.getVirtualHost()));
|
factory.setVirtualHost(consumerProps.getProperty("virtualHost", factory.getVirtualHost()));
|
||||||
|
|
||||||
|
boolean durable = Boolean.valueOf(consumerProps.getProperty("durable", "false"));
|
||||||
|
boolean exclusive = Boolean.valueOf(consumerProps.getProperty("exclusive", "false"));
|
||||||
|
boolean autoDelete = Boolean.valueOf(consumerProps.getProperty("autoDelete", "false"));
|
||||||
|
boolean autoAck = Boolean.valueOf(consumerProps.getProperty("autoAck", "true"));
|
||||||
|
|
||||||
final Connection connection = factory.newConnection();
|
final Connection connection = factory.newConnection();
|
||||||
final Channel channel = connection.createChannel();
|
final Channel channel = connection.createChannel();
|
||||||
channel.queueDeclare(queue, true, false, false, null);
|
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
|
||||||
channel.queueBind(queue, exchange, routingKey);
|
channel.queueBind(queue, exchange, routingKey);
|
||||||
final QueueingConsumer consumer = new QueueingConsumer(channel);
|
final QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||||
channel.basicConsume(queue, false, consumer);
|
channel.basicConsume(queue, autoAck, consumer);
|
||||||
|
|
||||||
return new Firehose(){
|
return new Firehose(){
|
||||||
|
|
||||||
|
@ -103,6 +108,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shouldn't ever get here but in case we'll assume there is no more stuff.
|
// Shouldn't ever get here but in case we'll assume there is no more stuff.
|
||||||
|
log.wtf("We shouldn't be here!");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue