Merge pull request #2756 from gianm/rabbit-bbirp

RabbitMQ: Swap SIRP to BBIRP so it can use more kinds of parsers.
This commit is contained in:
Fangjin Yang 2016-03-29 11:54:35 -07:00
commit 77fff5709e
1 changed files with 5 additions and 7 deletions

View File

@ -21,7 +21,6 @@ package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
@ -32,10 +31,10 @@ import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
@ -43,6 +42,7 @@ import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -99,7 +99,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* For more information on RabbitMQ high availability please see:
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
*/
public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowParser>
public class RabbitMQFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser>
{
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@ -134,10 +134,8 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
}
@Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException
{
final StringInputRowParser stringParser = firehoseParser;
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
Config lyraConfig = new Config()
.withRecoveryPolicy(
@ -235,7 +233,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
return null;
}
return stringParser.parse(StringUtils.fromUtf8(delivery.getBody()));
return firehoseParser.parse(ByteBuffer.wrap(delivery.getBody()));
}
@Override