diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java index 9f6e6f67ec8..4bd8ccb24a4 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java @@ -19,14 +19,11 @@ package io.druid.firehose.rabbitmq; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Maps; import com.rabbitmq.client.ConnectionFactory; -import javax.annotation.Nullable; -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; import java.util.Map; /** @@ -35,133 +32,137 @@ import java.util.Map; */ public class JacksonifiedConnectionFactory extends ConnectionFactory { + public static JacksonifiedConnectionFactory makeDefaultConnectionFactory() throws Exception + { + return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null); + } + + private final String host; + private final int port; + private final String username; + private final String password; + private final String virtualHost; + private final String uri; + private final int requestedChannelMax; + private final int requestedFrameMax; + private final int requestedHeartbeat; + private final int connectionTimeout; + private final Map clientProperties; + + @JsonCreator + public JacksonifiedConnectionFactory( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("username") String username, + @JsonProperty("password") String password, + @JsonProperty("virtualHost") String virtualHost, + @JsonProperty("uri") String uri, + @JsonProperty("requestedChannelMax") int requestedChannelMax, + @JsonProperty("requestedFrameMax") int requestedFrameMax, + @JsonProperty("requestedHeartbeat") int requestedHeartbeat, + @JsonProperty("connectionTimeout") int connectionTimeout, + @JsonProperty("clientProperties") Map clientProperties + ) throws Exception + { + this.host = host == null ? super.getHost() : host; + this.port = port == 0 ? super.getPort() : port; + this.username = username == null ? super.getUsername() : username; + this.password = password == null ? super.getPassword() : password; + this.virtualHost = virtualHost == null ? super.getVirtualHost() : virtualHost; + this.uri = uri; + this.requestedChannelMax = requestedChannelMax == 0 ? super.getRequestedChannelMax() : requestedChannelMax; + this.requestedFrameMax = requestedFrameMax == 0 ? super.getRequestedFrameMax() : requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat == 0 ? super.getRequestedHeartbeat() : requestedHeartbeat; + this.connectionTimeout = connectionTimeout == 0 ? super.getConnectionTimeout() : connectionTimeout; + this.clientProperties = clientProperties == null ? super.getClientProperties() : clientProperties; + + super.setHost(this.host); + super.setPort(this.port); + super.setUsername(this.username); + super.setPassword(this.password); + super.setVirtualHost(this.virtualHost); + if (this.uri != null) { + super.setUri(this.uri); + } + super.setRequestedChannelMax(this.requestedChannelMax); + super.setRequestedFrameMax(this.requestedFrameMax); + super.setRequestedHeartbeat(this.requestedHeartbeat); + super.setConnectionTimeout(this.connectionTimeout); + super.setClientProperties(this.clientProperties); + } + @Override @JsonProperty public String getHost() { - return super.getHost(); - } - - @Override - public void setHost(String host) - { - super.setHost(host); + return host; } @Override @JsonProperty public int getPort() { - return super.getPort(); + return port; } - @Override - public void setPort(int port) - { - super.setPort(port); - } @Override @JsonProperty public String getUsername() { - return super.getUsername(); - } - - @Override - public void setUsername(String username) - { - super.setUsername(username); + return username; } @Override @JsonProperty public String getPassword() { - return super.getPassword(); - } - - @Override - public void setPassword(String password) - { - super.setPassword(password); + return password; } @Override @JsonProperty public String getVirtualHost() { - return super.getVirtualHost(); + return virtualHost; } - @Override - public void setVirtualHost(String virtualHost) - { - super.setVirtualHost(virtualHost); - } - - @Override @JsonProperty - public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException + public String getUri() { - super.setUri(uriString); + return uri; } @Override @JsonProperty public int getRequestedChannelMax() { - return super.getRequestedChannelMax(); - } - - @Override - public void setRequestedChannelMax(int requestedChannelMax) - { - super.setRequestedChannelMax(requestedChannelMax); + return requestedChannelMax; } @Override @JsonProperty public int getRequestedFrameMax() { - return super.getRequestedFrameMax(); - } - - @Override - public void setRequestedFrameMax(int requestedFrameMax) - { - super.setRequestedFrameMax(requestedFrameMax); + return requestedFrameMax; } @Override @JsonProperty public int getRequestedHeartbeat() { - return super.getRequestedHeartbeat(); - } - - @Override - public void setConnectionTimeout(int connectionTimeout) - { - super.setConnectionTimeout(connectionTimeout); + return requestedHeartbeat; } @Override @JsonProperty public int getConnectionTimeout() { - return super.getConnectionTimeout(); + return connectionTimeout; } - @Override - public void setRequestedHeartbeat(int requestedHeartbeat) - { - super.setRequestedHeartbeat(requestedHeartbeat); - } - - @Override - @JsonProperty - public Map getClientProperties() + @JsonProperty("clientProperties") + public Map getClientPropertiesForSerde() { return Maps.transformEntries( super.getClientProperties(), @@ -177,8 +178,68 @@ public class JacksonifiedConnectionFactory extends ConnectionFactory } @Override - public void setClientProperties(Map clientProperties) + public boolean equals(Object o) { - super.setClientProperties(clientProperties); + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JacksonifiedConnectionFactory that = (JacksonifiedConnectionFactory) o; + + if (connectionTimeout != that.connectionTimeout) { + return false; + } + if (port != that.port) { + return false; + } + if (requestedChannelMax != that.requestedChannelMax) { + return false; + } + if (requestedFrameMax != that.requestedFrameMax) { + return false; + } + if (requestedHeartbeat != that.requestedHeartbeat) { + return false; + } + if (clientProperties != null ? !clientProperties.equals(that.clientProperties) : that.clientProperties != null) { + return false; + } + if (host != null ? !host.equals(that.host) : that.host != null) { + return false; + } + if (password != null ? !password.equals(that.password) : that.password != null) { + return false; + } + if (uri != null ? !uri.equals(that.uri) : that.uri != null) { + return false; + } + if (username != null ? !username.equals(that.username) : that.username != null) { + return false; + } + if (virtualHost != null ? !virtualHost.equals(that.virtualHost) : that.virtualHost != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = host != null ? host.hashCode() : 0; + result = 31 * result + port; + result = 31 * result + (username != null ? username.hashCode() : 0); + result = 31 * result + (password != null ? password.hashCode() : 0); + result = 31 * result + (virtualHost != null ? virtualHost.hashCode() : 0); + result = 31 * result + (uri != null ? uri.hashCode() : 0); + result = 31 * result + requestedChannelMax; + result = 31 * result + requestedFrameMax; + result = 31 * result + requestedHeartbeat; + result = 31 * result + connectionTimeout; + result = 31 * result + (clientProperties != null ? clientProperties.hashCode() : 0); + return result; } } diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java index 0b17e533144..325bc41cd92 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java @@ -19,6 +19,7 @@ package io.druid.firehose.rabbitmq; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; /** @@ -26,17 +27,50 @@ import com.fasterxml.jackson.annotation.JsonProperty; */ public class RabbitMQFirehoseConfig { - private String queue = null; - private String exchange = null; - private String routingKey = null; - private boolean durable = false; - private boolean exclusive = false; - private boolean autoDelete = false; - // Lyra (auto reconnect) properties - private int maxRetries = 100; - private int retryIntervalSeconds = 2; - private long maxDurationSeconds = 5 * 60; + private static final int defaultMaxRetries = 100; + private static final int defaultRetryIntervalSeconds = 2; + private static final long defaultMaxDurationSeconds = 5 * 60; + + public static RabbitMQFirehoseConfig makeDefaultConfig() + { + return new RabbitMQFirehoseConfig(null, null, null, false, false, false, 0, 0, 0); + } + + private final String queue; + private final String exchange; + private final String routingKey; + private final boolean durable; + private final boolean exclusive; + private final boolean autoDelete; + private final int maxRetries; + private final int retryIntervalSeconds; + private final long maxDurationSeconds; + + @JsonCreator + public RabbitMQFirehoseConfig( + @JsonProperty("queue") String queue, + @JsonProperty("exchange") String exchange, + @JsonProperty("routingKey") String routingKey, + @JsonProperty("durable") boolean durable, + @JsonProperty("exclusive") boolean exclusive, + @JsonProperty("autoDelete") boolean autoDelete, + @JsonProperty("maxRetries") int maxRetries, + @JsonProperty("retryIntervalSeconds") int retryIntervalSeconds, + @JsonProperty("maxDurationSeconds") long maxDurationSeconds + ) + { + this.queue = queue; + this.exchange = exchange; + this.routingKey = routingKey; + this.durable = durable; + this.exclusive = exclusive; + this.autoDelete = autoDelete; + + this.maxRetries = maxRetries == 0 ? defaultMaxRetries : maxRetries; + this.retryIntervalSeconds = retryIntervalSeconds == 0 ? defaultRetryIntervalSeconds : retryIntervalSeconds; + this.maxDurationSeconds = maxDurationSeconds == 0 ? defaultMaxDurationSeconds : maxDurationSeconds; + } @JsonProperty public String getQueue() @@ -44,96 +78,109 @@ public class RabbitMQFirehoseConfig return queue; } - public void setQueue(String queue) - { - this.queue = queue; - } - @JsonProperty public String getExchange() { return exchange; } - public void setExchange(String exchange) - { - this.exchange = exchange; - } - @JsonProperty public String getRoutingKey() { return routingKey; } - public void setRoutingKey(String routingKey) - { - this.routingKey = routingKey; - } - @JsonProperty public boolean isDurable() { return durable; } - public void setDurable(boolean durable) - { - this.durable = durable; - } - @JsonProperty public boolean isExclusive() { return exclusive; } - public void setExclusive(boolean exclusive) - { - this.exclusive = exclusive; - } - @JsonProperty public boolean isAutoDelete() { return autoDelete; } - public void setAutoDelete(boolean autoDelete) - { - this.autoDelete = autoDelete; - } - @JsonProperty public int getMaxRetries() { return maxRetries; } - public void setMaxRetries(int maxRetries) - { - this.maxRetries = maxRetries; - } - @JsonProperty public int getRetryIntervalSeconds() { return retryIntervalSeconds; } - public void setRetryIntervalSeconds(int retryIntervalSeconds) - { - this.retryIntervalSeconds = retryIntervalSeconds; - } - @JsonProperty public long getMaxDurationSeconds() { return maxDurationSeconds; } - public void setMaxDurationSeconds(int maxDurationSeconds) + @Override + public boolean equals(Object o) { - this.maxDurationSeconds = maxDurationSeconds; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RabbitMQFirehoseConfig that = (RabbitMQFirehoseConfig) o; + + if (autoDelete != that.autoDelete) { + return false; + } + if (durable != that.durable) { + return false; + } + if (exclusive != that.exclusive) { + return false; + } + if (maxDurationSeconds != that.maxDurationSeconds) { + return false; + } + if (maxRetries != that.maxRetries) { + return false; + } + if (retryIntervalSeconds != that.retryIntervalSeconds) { + return false; + } + if (exchange != null ? !exchange.equals(that.exchange) : that.exchange != null) { + return false; + } + if (queue != null ? !queue.equals(that.queue) : that.queue != null) { + return false; + } + if (routingKey != null ? !routingKey.equals(that.routingKey) : that.routingKey != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = queue != null ? queue.hashCode() : 0; + result = 31 * result + (exchange != null ? exchange.hashCode() : 0); + result = 31 * result + (routingKey != null ? routingKey.hashCode() : 0); + result = 31 * result + (durable ? 1 : 0); + result = 31 * result + (exclusive ? 1 : 0); + result = 31 * result + (autoDelete ? 1 : 0); + result = 31 * result + maxRetries; + result = 31 * result + retryIntervalSeconds; + result = 31 * result + (int) (maxDurationSeconds ^ (maxDurationSeconds >>> 32)); + return result; } } diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index ec650829e5a..f38398bcc84 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -26,7 +26,6 @@ import com.metamx.common.logger.Logger; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; @@ -114,10 +113,12 @@ public class RabbitMQFirehoseFactory implements FirehoseFactoryof("hi", "bye") + ); + RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory( - new JacksonifiedConnectionFactory(), - new RabbitMQFirehoseConfig(), + connectionFactory, + config, new StringInputRowParser( new JSONParseSpec( new TimestampSpec("timestamp", "auto"), new DimensionsSpec( - Arrays.asList("dim"), + Arrays.asList("dim"), Lists.newArrayList(), Lists.newArrayList() ) @@ -65,5 +92,8 @@ public class RabbitMQFirehoseFactoryTest byte[] bytes2 = mapper.writeValueAsBytes(factory2); Assert.assertArrayEquals(bytes, bytes2); + + Assert.assertEquals(factory.getConfig(), factory2.getConfig()); + Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory()); } }