Implemented a better query.body for the test I'm using. Added a shutdown listener to the connection and channel to get a notification when they are closed. Question remains whether the connection should be re-established when that happens.

This commit is contained in:
Stefán Freyr Stefánsson 2013-07-15 09:50:58 +00:00
parent 4d998e4c27
commit 13328a6b36
2 changed files with 21 additions and 13 deletions

View File

@ -1,19 +1,12 @@
{ {
"queryType": "groupBy", "queryType": "groupBy",
"dataSource": "randSeq", "dataSource": "rabbitmqtest",
"granularity": "all", "granularity": "all",
"dimensions": [], "dimensions": [],
"aggregations":[ "aggregations": [
{ "type": "count", "name": "rows"}, { "type": "count", "name": "rows" },
{ "type": "doubleSum", "fieldName": "events", "name": "e"}, {"type": "longSum", "name": "imps", "fieldName": "impressions"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} {"type": "doubleSum", "name": "wp", "fieldName": "wp"}
], ],
"postAggregations":[ "intervals": ["2010-01-01T00:00/2020-01-01T00"]
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
} }

View File

@ -72,6 +72,20 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{
channel.queueBind(queue, exchange, routingKey); channel.queueBind(queue, exchange, routingKey);
final QueueingConsumer consumer = new QueueingConsumer(channel); final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, autoAck, consumer); channel.basicConsume(queue, autoAck, consumer);
channel.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
log.warn(cause, "Channel closed!");
//TODO: should we re-establish the connection here?
}
});
connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
log.warn(cause, "Connection closed!");
//TODO: should we re-establish the connection here?
}
});
return new Firehose(){ return new Firehose(){
@ -148,6 +162,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{
@Override @Override
public void close() throws IOException { public void close() throws IOException {
log.info("Closing connection to RabbitMQ");
channel.close(); channel.close();
connection.close(); connection.close();
} }