mirror of https://github.com/apache/druid.git
RabbitMQ: Swap SIRP to BBIRP so it can use more kinds of parsers.
This commit is contained in:
parent
e023df2b92
commit
9877c570d3
|
@ -21,7 +21,6 @@ package io.druid.firehose.rabbitmq;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.common.StringUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Channel;
|
||||
|
@ -32,10 +31,10 @@ import com.rabbitmq.client.Envelope;
|
|||
import com.rabbitmq.client.QueueingConsumer.Delivery;
|
||||
import com.rabbitmq.client.ShutdownListener;
|
||||
import com.rabbitmq.client.ShutdownSignalException;
|
||||
import io.druid.data.input.ByteBufferInputRowParser;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.FirehoseFactory;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import net.jodah.lyra.ConnectionOptions;
|
||||
import net.jodah.lyra.Connections;
|
||||
import net.jodah.lyra.config.Config;
|
||||
|
@ -43,6 +42,7 @@ import net.jodah.lyra.retry.RetryPolicy;
|
|||
import net.jodah.lyra.util.Duration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
|
@ -99,7 +99,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
* For more information on RabbitMQ high availability please see:
|
||||
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
|
||||
*/
|
||||
public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowParser>
|
||||
public class RabbitMQFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser>
|
||||
{
|
||||
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
|
||||
|
||||
|
@ -134,10 +134,8 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
|||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
|
||||
public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException
|
||||
{
|
||||
final StringInputRowParser stringParser = firehoseParser;
|
||||
|
||||
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
|
||||
Config lyraConfig = new Config()
|
||||
.withRecoveryPolicy(
|
||||
|
@ -235,7 +233,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
|||
return null;
|
||||
}
|
||||
|
||||
return stringParser.parse(StringUtils.fromUtf8(delivery.getBody()));
|
||||
return firehoseParser.parse(ByteBuffer.wrap(delivery.getBody()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue