diff --git a/realtime/pom.xml b/realtime/pom.xml
index abe217c1af2..2d59174dd01 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -120,6 +120,11 @@
+
+ com.rabbitmq
+ amqp-client
+ 3.1.1
+
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java
index 71bf6c108ad..c9d9dfec3de 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java
@@ -27,6 +27,7 @@ import java.io.IOException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
+ @JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
})
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java
new file mode 100644
index 00000000000..e1815f5409a
--- /dev/null
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java
@@ -0,0 +1,150 @@
+package com.metamx.druid.realtime.firehose;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.indexer.data.StringInputRowParser;
+import com.metamx.druid.input.InputRow;
+import com.rabbitmq.client.*;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A FirehoseFactory
+ */
+public class RabbitMQFirehoseFactory implements FirehoseFactory{
+
+ private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
+
+ @JsonProperty
+ private final Properties consumerProps;
+
+ @JsonProperty
+ private final String queue;
+
+ @JsonProperty
+ private final String exchange;
+
+ @JsonProperty
+ private final String routingKey;
+
+ @JsonProperty
+ private final StringInputRowParser parser;
+
+ @JsonCreator
+ public RabbitMQFirehoseFactory(
+ @JsonProperty("consumerProps") Properties consumerProps,
+ @JsonProperty("queue") String queue,
+ @JsonProperty("exchange") String exchange,
+ @JsonProperty("routingKey") String routingKey,
+ @JsonProperty("parser") StringInputRowParser parser
+ )
+ {
+ this.consumerProps = consumerProps;
+ this.queue = queue;
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.parser = parser;
+
+ parser.addDimensionExclusion("queue");
+ parser.addDimensionExclusion("exchange");
+ parser.addDimensionExclusion("routingKey");
+ }
+
+ @Override
+ public Firehose connect() throws IOException {
+
+ final ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(consumerProps.getProperty("host", factory.getHost()));
+ factory.setUsername(consumerProps.getProperty("username", factory.getUsername()));
+ factory.setPassword(consumerProps.getProperty("password", factory.getPassword()));
+ factory.setVirtualHost(consumerProps.getProperty("virtualHost", factory.getVirtualHost()));
+
+ final Connection connection = factory.newConnection();
+ final Channel channel = connection.createChannel();
+ channel.queueDeclare(queue, true, false, false, null);
+ channel.queueBind(queue, exchange, routingKey);
+ final QueueingConsumer consumer = new QueueingConsumer(channel);
+ channel.basicConsume(queue, false, consumer);
+
+ return new Firehose(){
+
+ //private final Connection connection = conn;
+ //private final Channel channel = ch;
+ //private final QueueingConsumer consumer = qc;
+
+ /**
+ * Storing the latest delivery as a member variable should be safe since this will only be run
+ * by a single thread.
+ */
+ private QueueingConsumer.Delivery delivery;
+
+ /**
+ * Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
+ * and including this tag. See commit() for more detail.
+ */
+ private long lastDeliveryTag;
+
+ @Override
+ public boolean hasMore() {
+ try {
+ delivery = consumer.nextDelivery();
+ lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
+
+ log.debug("got new message");
+ } catch (InterruptedException e) {
+ log.wtf(e, "Don't know if this is supposed to ever happen.");
+ return false;
+ }
+
+ if(delivery != null){
+ return true;
+ }
+
+ // Shouldn't ever get here but in case we'll assume there is no more stuff.
+ return false;
+ }
+
+ @Override
+ public InputRow nextRow() {
+ log.debug("consuming new message");
+
+ return parser.parse(new String(delivery.getBody()));
+ }
+
+ @Override
+ public Runnable commit() {
+
+ // This method will be called from the same thread that calls the other methods of
+ // this Firehose. However, the returned Runnable will be called by a different thread.
+ //
+ // It should be (thread) safe to copy the lastDeliveryTag like we do below.
+ return new Runnable()
+ {
+ // Store (copy) the last delivery tag to "become" thread safe.
+ final long deliveryTag = lastDeliveryTag;
+
+ @Override
+ public void run()
+ {
+ try {
+ log.info("Acknowledging delivery of messages up to tag: " + deliveryTag);
+
+ // Acknowledge all messages up to and including the stored delivery tag.
+ channel.basicAck(deliveryTag, true);
+ } catch (IOException e) {
+ log.error(e, "Unable to acknowledge message reception to message queue.");
+ }
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ connection.close();
+ }
+ };
+ }
+}