diff --git a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java
index 89f20bc0c96..2867a0270e9 100644
--- a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java
+++ b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java
@@ -21,7 +21,6 @@ package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
@@ -32,10 +31,10 @@ import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
+import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
-import io.druid.data.input.impl.StringInputRowParser;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
@@ -43,6 +42,7 @@ import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -99,7 +99,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* For more information on RabbitMQ high availability please see:
* http://www.rabbitmq.com/ha.html.
*/
-public class RabbitMQFirehoseFactory implements FirehoseFactory
+public class RabbitMQFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@@ -134,10 +134,8 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory