diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index 2f337d357ef..71e53b5089c 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -25,9 +25,13 @@ import com.metamx.common.logger.Logger; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.ConsumerCancelledException; import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -40,6 +44,8 @@ import net.jodah.lyra.retry.RetryPolicy; import net.jodah.lyra.util.Duration; import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; /** * A FirehoseFactory for RabbitMQ. @@ -179,7 +185,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory _queue; + + public QueueingConsumer(Channel ch) { + this(ch, new LinkedBlockingQueue()); + } + + public QueueingConsumer(Channel ch, BlockingQueue q) { + super(ch); + this._queue = q; + } + + @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + _queue.clear(); + } + + @Override public void handleCancel(String consumerTag) throws IOException { + _queue.clear(); + } + + @Override public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException + { + this._queue.add(new Delivery(envelope, properties, body)); + } + + public Delivery nextDelivery() + throws InterruptedException, ShutdownSignalException, ConsumerCancelledException + { + return _queue.take(); + } + } }