diff --git a/examples/bin/examples/rabbitmq/query.body b/examples/bin/examples/rabbitmq/query.body index 05007c955b9..59ca8fe1e2c 100644 --- a/examples/bin/examples/rabbitmq/query.body +++ b/examples/bin/examples/rabbitmq/query.body @@ -1,19 +1,12 @@ { "queryType": "groupBy", - "dataSource": "randSeq", + "dataSource": "rabbitmqtest", "granularity": "all", "dimensions": [], - "aggregations":[ - { "type": "count", "name": "rows"}, - { "type": "doubleSum", "fieldName": "events", "name": "e"}, - { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} + "aggregations": [ + { "type": "count", "name": "rows" }, + {"type": "longSum", "name": "imps", "fieldName": "impressions"}, + {"type": "doubleSum", "name": "wp", "fieldName": "wp"} ], - "postAggregations":[ - { "type":"arithmetic", - "name":"avg_random", - "fn":"/", - "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"}, - {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]} - ], - "intervals":["2012-10-01T00:00/2020-01-01T00"] + "intervals": ["2010-01-01T00:00/2020-01-01T00"] } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java index 6140f82a93f..b03c7c73f32 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java @@ -72,6 +72,20 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{ channel.queueBind(queue, exchange, routingKey); final QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, autoAck, consumer); + channel.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + log.warn(cause, "Channel closed!"); + //TODO: should we re-establish the connection here? + } + }); + connection.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + log.warn(cause, "Connection closed!"); + //TODO: should we re-establish the connection here? + } + }); return new Firehose(){ @@ -148,6 +162,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{ @Override public void close() throws IOException { + log.info("Closing connection to RabbitMQ"); channel.close(); connection.close(); }