diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml index e0d2e693414..a35f3b16c15 100644 --- a/rabbitmq/pom.xml +++ b/rabbitmq/pom.xml @@ -18,11 +18,6 @@ io.druid druid-api - - io.druid - druid-processing - ${project.parent.version} - com.rabbitmq amqp-client @@ -45,5 +40,11 @@ commons-cli test + + io.druid + druid-processing + ${project.parent.version} + test + \ No newline at end of file diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java index a4a39bcc9cd..28f1d0d14d1 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Maps; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.LongString; import java.util.Map; @@ -37,6 +38,24 @@ public class JacksonifiedConnectionFactory extends ConnectionFactory return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null); } + private static Map getSerializableClientProperties(final Map clientProperties) + { + return Maps.transformEntries( + clientProperties, + new Maps.EntryTransformer() + { + @Override + public Object transformEntry(String key, Object value) + { + if (value instanceof LongString) { + return value.toString(); + } + return value; + } + } + ); + } + private final String host; private final int port; private final String username; @@ -164,19 +183,9 @@ public class JacksonifiedConnectionFactory extends ConnectionFactory } @JsonProperty("clientProperties") - public Map getClientPropertiesForSerde() + public Map getSerializableClientProperties() { - return Maps.transformEntries( - super.getClientProperties(), - new Maps.EntryTransformer() - { - @Override - public Object transformEntry(String key, Object value) - { - return value.toString(); - } - } - ); + return getSerializableClientProperties(clientProperties); } @Override @@ -206,7 +215,12 @@ public class JacksonifiedConnectionFactory extends ConnectionFactory if (requestedHeartbeat != that.requestedHeartbeat) { return false; } - if (clientProperties != null ? Maps.difference(clientProperties, that.clientProperties).areEqual() : that.clientProperties != null) { + if (clientProperties != null + ? !Maps.difference( + getSerializableClientProperties(clientProperties), + getSerializableClientProperties(that.clientProperties) + ).areEqual() + : that.clientProperties != null) { return false; } if (host != null ? !host.equals(that.host) : that.host != null) {