From 01b7507a5ac6d5759c6eb8632dfb4f0e3e36a5cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Sat, 13 Jul 2013 16:03:46 +0000 Subject: [PATCH] First version of a RabbitMQ Firehose implementation. --- realtime/pom.xml | 5 + .../realtime/firehose/FirehoseFactory.java | 1 + .../firehose/RabbitMQFirehoseFactory.java | 150 ++++++++++++++++++ 3 files changed, 156 insertions(+) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java diff --git a/realtime/pom.xml b/realtime/pom.xml index abe217c1af2..2d59174dd01 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -120,6 +120,11 @@ + + com.rabbitmq + amqp-client + 3.1.1 + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java index 71bf6c108ad..c9d9dfec3de 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java @@ -27,6 +27,7 @@ import java.io.IOException; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class), + @JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class), @JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class), @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class) }) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java new file mode 100644 index 00000000000..e1815f5409a --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java @@ -0,0 +1,150 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.logger.Logger; +import com.metamx.druid.indexer.data.StringInputRowParser; +import com.metamx.druid.input.InputRow; +import com.rabbitmq.client.*; + +import java.io.IOException; +import java.util.*; + +/** + * A FirehoseFactory + */ +public class RabbitMQFirehoseFactory implements FirehoseFactory{ + + private static final Logger log = new Logger(RabbitMQFirehoseFactory.class); + + @JsonProperty + private final Properties consumerProps; + + @JsonProperty + private final String queue; + + @JsonProperty + private final String exchange; + + @JsonProperty + private final String routingKey; + + @JsonProperty + private final StringInputRowParser parser; + + @JsonCreator + public RabbitMQFirehoseFactory( + @JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("queue") String queue, + @JsonProperty("exchange") String exchange, + @JsonProperty("routingKey") String routingKey, + @JsonProperty("parser") StringInputRowParser parser + ) + { + this.consumerProps = consumerProps; + this.queue = queue; + this.exchange = exchange; + this.routingKey = routingKey; + this.parser = parser; + + parser.addDimensionExclusion("queue"); + parser.addDimensionExclusion("exchange"); + parser.addDimensionExclusion("routingKey"); + } + + @Override + public Firehose connect() throws IOException { + + final ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(consumerProps.getProperty("host", factory.getHost())); + factory.setUsername(consumerProps.getProperty("username", factory.getUsername())); + factory.setPassword(consumerProps.getProperty("password", factory.getPassword())); + factory.setVirtualHost(consumerProps.getProperty("virtualHost", factory.getVirtualHost())); + + final Connection connection = factory.newConnection(); + final Channel channel = connection.createChannel(); + channel.queueDeclare(queue, true, false, false, null); + channel.queueBind(queue, exchange, routingKey); + final QueueingConsumer consumer = new QueueingConsumer(channel); + channel.basicConsume(queue, false, consumer); + + return new Firehose(){ + + //private final Connection connection = conn; + //private final Channel channel = ch; + //private final QueueingConsumer consumer = qc; + + /** + * Storing the latest delivery as a member variable should be safe since this will only be run + * by a single thread. + */ + private QueueingConsumer.Delivery delivery; + + /** + * Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to + * and including this tag. See commit() for more detail. + */ + private long lastDeliveryTag; + + @Override + public boolean hasMore() { + try { + delivery = consumer.nextDelivery(); + lastDeliveryTag = delivery.getEnvelope().getDeliveryTag(); + + log.debug("got new message"); + } catch (InterruptedException e) { + log.wtf(e, "Don't know if this is supposed to ever happen."); + return false; + } + + if(delivery != null){ + return true; + } + + // Shouldn't ever get here but in case we'll assume there is no more stuff. + return false; + } + + @Override + public InputRow nextRow() { + log.debug("consuming new message"); + + return parser.parse(new String(delivery.getBody())); + } + + @Override + public Runnable commit() { + + // This method will be called from the same thread that calls the other methods of + // this Firehose. However, the returned Runnable will be called by a different thread. + // + // It should be (thread) safe to copy the lastDeliveryTag like we do below. + return new Runnable() + { + // Store (copy) the last delivery tag to "become" thread safe. + final long deliveryTag = lastDeliveryTag; + + @Override + public void run() + { + try { + log.info("Acknowledging delivery of messages up to tag: " + deliveryTag); + + // Acknowledge all messages up to and including the stored delivery tag. + channel.basicAck(deliveryTag, true); + } catch (IOException e) { + log.error(e, "Unable to acknowledge message reception to message queue."); + } + } + }; + } + + @Override + public void close() throws IOException { + channel.close(); + connection.close(); + } + }; + } +}