mirror of https://github.com/apache/druid.git
Make RabbitMQ Firehose resilient to broker deconnections
This commit is contained in:
parent
5ce80068d2
commit
b7426cd0b2
|
@ -25,9 +25,13 @@ import com.metamx.common.logger.Logger;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import com.rabbitmq.client.Connection;
|
import com.rabbitmq.client.Connection;
|
||||||
import com.rabbitmq.client.ConnectionFactory;
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
import com.rabbitmq.client.QueueingConsumer;
|
import com.rabbitmq.client.DefaultConsumer;
|
||||||
|
import com.rabbitmq.client.Envelope;
|
||||||
|
import com.rabbitmq.client.AMQP;
|
||||||
|
import com.rabbitmq.client.QueueingConsumer.Delivery;
|
||||||
import com.rabbitmq.client.ShutdownListener;
|
import com.rabbitmq.client.ShutdownListener;
|
||||||
import com.rabbitmq.client.ShutdownSignalException;
|
import com.rabbitmq.client.ShutdownSignalException;
|
||||||
|
import com.rabbitmq.client.ConsumerCancelledException;
|
||||||
import io.druid.data.input.ByteBufferInputRowParser;
|
import io.druid.data.input.ByteBufferInputRowParser;
|
||||||
import io.druid.data.input.Firehose;
|
import io.druid.data.input.Firehose;
|
||||||
import io.druid.data.input.FirehoseFactory;
|
import io.druid.data.input.FirehoseFactory;
|
||||||
|
@ -40,6 +44,8 @@ import net.jodah.lyra.retry.RetryPolicy;
|
||||||
import net.jodah.lyra.util.Duration;
|
import net.jodah.lyra.util.Duration;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A FirehoseFactory for RabbitMQ.
|
* A FirehoseFactory for RabbitMQ.
|
||||||
|
@ -179,7 +185,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
||||||
* Storing the latest delivery as a member variable should be safe since this will only be run
|
* Storing the latest delivery as a member variable should be safe since this will only be run
|
||||||
* by a single thread.
|
* by a single thread.
|
||||||
*/
|
*/
|
||||||
private QueueingConsumer.Delivery delivery;
|
private Delivery delivery;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
|
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
|
||||||
|
@ -268,4 +274,41 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
||||||
{
|
{
|
||||||
return parser;
|
return parser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class QueueingConsumer extends DefaultConsumer
|
||||||
|
{
|
||||||
|
private final BlockingQueue<Delivery> _queue;
|
||||||
|
|
||||||
|
public QueueingConsumer(Channel ch) {
|
||||||
|
this(ch, new LinkedBlockingQueue<Delivery>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
|
||||||
|
super(ch);
|
||||||
|
this._queue = q;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
|
||||||
|
_queue.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void handleCancel(String consumerTag) throws IOException {
|
||||||
|
_queue.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void handleDelivery(String consumerTag,
|
||||||
|
Envelope envelope,
|
||||||
|
AMQP.BasicProperties properties,
|
||||||
|
byte[] body)
|
||||||
|
throws IOException
|
||||||
|
{
|
||||||
|
this._queue.add(new Delivery(envelope, properties, body));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Delivery nextDelivery()
|
||||||
|
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
|
||||||
|
{
|
||||||
|
return _queue.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue