diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml index 745f10e88ac..a35f3b16c15 100644 --- a/rabbitmq/pom.xml +++ b/rabbitmq/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 io.druid.extensions druid-rabbitmq @@ -39,5 +40,11 @@ commons-cli test + + io.druid + druid-processing + ${project.parent.version} + test + \ No newline at end of file diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java index 132fe3b6179..28f1d0d14d1 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java @@ -19,12 +19,12 @@ package io.druid.firehose.rabbitmq; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.LongString; -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; import java.util.Map; /** @@ -33,140 +33,229 @@ import java.util.Map; */ public class JacksonifiedConnectionFactory extends ConnectionFactory { + public static JacksonifiedConnectionFactory makeDefaultConnectionFactory() throws Exception + { + return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null); + } + + private static Map getSerializableClientProperties(final Map clientProperties) + { + return Maps.transformEntries( + clientProperties, + new Maps.EntryTransformer() + { + @Override + public Object transformEntry(String key, Object value) + { + if (value instanceof LongString) { + return value.toString(); + } + return value; + } + } + ); + } + + private final String host; + private final int port; + private final String username; + private final String password; + private final String virtualHost; + private final String uri; + private final int requestedChannelMax; + private final int requestedFrameMax; + private final int requestedHeartbeat; + private final int connectionTimeout; + private final Map clientProperties; + + @JsonCreator + public JacksonifiedConnectionFactory( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("username") String username, + @JsonProperty("password") String password, + @JsonProperty("virtualHost") String virtualHost, + @JsonProperty("uri") String uri, + @JsonProperty("requestedChannelMax") int requestedChannelMax, + @JsonProperty("requestedFrameMax") int requestedFrameMax, + @JsonProperty("requestedHeartbeat") int requestedHeartbeat, + @JsonProperty("connectionTimeout") int connectionTimeout, + @JsonProperty("clientProperties") Map clientProperties + ) throws Exception + { + super(); + + this.host = host == null ? super.getHost() : host; + this.port = port == 0 ? super.getPort() : port; + this.username = username == null ? super.getUsername() : username; + this.password = password == null ? super.getPassword() : password; + this.virtualHost = virtualHost == null ? super.getVirtualHost() : virtualHost; + this.uri = uri; + this.requestedChannelMax = requestedChannelMax == 0 ? super.getRequestedChannelMax() : requestedChannelMax; + this.requestedFrameMax = requestedFrameMax == 0 ? super.getRequestedFrameMax() : requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat == 0 ? super.getRequestedHeartbeat() : requestedHeartbeat; + this.connectionTimeout = connectionTimeout == 0 ? super.getConnectionTimeout() : connectionTimeout; + this.clientProperties = clientProperties == null ? super.getClientProperties() : clientProperties; + + super.setHost(this.host); + super.setPort(this.port); + super.setUsername(this.username); + super.setPassword(this.password); + super.setVirtualHost(this.virtualHost); + if (this.uri != null) { + super.setUri(this.uri); + } + super.setRequestedChannelMax(this.requestedChannelMax); + super.setRequestedFrameMax(this.requestedFrameMax); + super.setRequestedHeartbeat(this.requestedHeartbeat); + super.setConnectionTimeout(this.connectionTimeout); + super.setClientProperties(this.clientProperties); + } + @Override @JsonProperty public String getHost() { - return super.getHost(); - } - - @Override - public void setHost(String host) - { - super.setHost(host); + return host; } @Override @JsonProperty public int getPort() { - return super.getPort(); + return port; } - @Override - public void setPort(int port) - { - super.setPort(port); - } @Override @JsonProperty public String getUsername() { - return super.getUsername(); - } - - @Override - public void setUsername(String username) - { - super.setUsername(username); + return username; } @Override @JsonProperty public String getPassword() { - return super.getPassword(); - } - - @Override - public void setPassword(String password) - { - super.setPassword(password); + return password; } @Override @JsonProperty public String getVirtualHost() { - return super.getVirtualHost(); + return virtualHost; } - @Override - public void setVirtualHost(String virtualHost) - { - super.setVirtualHost(virtualHost); - } - - @Override @JsonProperty - public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException + public String getUri() { - super.setUri(uriString); + return uri; } @Override @JsonProperty public int getRequestedChannelMax() { - return super.getRequestedChannelMax(); - } - - @Override - public void setRequestedChannelMax(int requestedChannelMax) - { - super.setRequestedChannelMax(requestedChannelMax); + return requestedChannelMax; } @Override @JsonProperty public int getRequestedFrameMax() { - return super.getRequestedFrameMax(); - } - - @Override - public void setRequestedFrameMax(int requestedFrameMax) - { - super.setRequestedFrameMax(requestedFrameMax); + return requestedFrameMax; } @Override @JsonProperty public int getRequestedHeartbeat() { - return super.getRequestedHeartbeat(); - } - - @Override - public void setConnectionTimeout(int connectionTimeout) - { - super.setConnectionTimeout(connectionTimeout); + return requestedHeartbeat; } @Override @JsonProperty public int getConnectionTimeout() { - return super.getConnectionTimeout(); + return connectionTimeout; + } + + @JsonProperty("clientProperties") + public Map getSerializableClientProperties() + { + return getSerializableClientProperties(clientProperties); } @Override - public void setRequestedHeartbeat(int requestedHeartbeat) + public boolean equals(Object o) { - super.setRequestedHeartbeat(requestedHeartbeat); + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JacksonifiedConnectionFactory that = (JacksonifiedConnectionFactory) o; + + if (connectionTimeout != that.connectionTimeout) { + return false; + } + if (port != that.port) { + return false; + } + if (requestedChannelMax != that.requestedChannelMax) { + return false; + } + if (requestedFrameMax != that.requestedFrameMax) { + return false; + } + if (requestedHeartbeat != that.requestedHeartbeat) { + return false; + } + if (clientProperties != null + ? !Maps.difference( + getSerializableClientProperties(clientProperties), + getSerializableClientProperties(that.clientProperties) + ).areEqual() + : that.clientProperties != null) { + return false; + } + if (host != null ? !host.equals(that.host) : that.host != null) { + return false; + } + if (password != null ? !password.equals(that.password) : that.password != null) { + return false; + } + if (uri != null ? !uri.equals(that.uri) : that.uri != null) { + return false; + } + if (username != null ? !username.equals(that.username) : that.username != null) { + return false; + } + if (virtualHost != null ? !virtualHost.equals(that.virtualHost) : that.virtualHost != null) { + return false; + } + + return true; } @Override - @JsonProperty - public Map getClientProperties() + public int hashCode() { - return super.getClientProperties(); - } - - @Override - public void setClientProperties(Map clientProperties) - { - super.setClientProperties(clientProperties); + int result = host != null ? host.hashCode() : 0; + result = 31 * result + port; + result = 31 * result + (username != null ? username.hashCode() : 0); + result = 31 * result + (password != null ? password.hashCode() : 0); + result = 31 * result + (virtualHost != null ? virtualHost.hashCode() : 0); + result = 31 * result + (uri != null ? uri.hashCode() : 0); + result = 31 * result + requestedChannelMax; + result = 31 * result + requestedFrameMax; + result = 31 * result + requestedHeartbeat; + result = 31 * result + connectionTimeout; + result = 31 * result + (clientProperties != null ? clientProperties.hashCode() : 0); + return result; } } diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java index 548cbcc1d1a..8ca79bbea99 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java @@ -29,7 +29,7 @@ import java.util.List; /** */ -public class RabbitMQDruidModule implements DruidModule +public class RabbitMQDruidModule implements DruidModule { @Override public List getJacksonModules() diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java index 7bae291c8a3..325bc41cd92 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java @@ -19,6 +19,7 @@ package io.druid.firehose.rabbitmq; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; /** @@ -26,17 +27,50 @@ import com.fasterxml.jackson.annotation.JsonProperty; */ public class RabbitMQFirehoseConfig { - private String queue = null; - private String exchange = null; - private String routingKey = null; - private boolean durable = false; - private boolean exclusive = false; - private boolean autoDelete = false; - // Lyra (auto reconnect) properties - private int maxRetries = 100; - private int retryIntervalSeconds = 2; - private long maxDurationSeconds = 5 * 60; + private static final int defaultMaxRetries = 100; + private static final int defaultRetryIntervalSeconds = 2; + private static final long defaultMaxDurationSeconds = 5 * 60; + + public static RabbitMQFirehoseConfig makeDefaultConfig() + { + return new RabbitMQFirehoseConfig(null, null, null, false, false, false, 0, 0, 0); + } + + private final String queue; + private final String exchange; + private final String routingKey; + private final boolean durable; + private final boolean exclusive; + private final boolean autoDelete; + private final int maxRetries; + private final int retryIntervalSeconds; + private final long maxDurationSeconds; + + @JsonCreator + public RabbitMQFirehoseConfig( + @JsonProperty("queue") String queue, + @JsonProperty("exchange") String exchange, + @JsonProperty("routingKey") String routingKey, + @JsonProperty("durable") boolean durable, + @JsonProperty("exclusive") boolean exclusive, + @JsonProperty("autoDelete") boolean autoDelete, + @JsonProperty("maxRetries") int maxRetries, + @JsonProperty("retryIntervalSeconds") int retryIntervalSeconds, + @JsonProperty("maxDurationSeconds") long maxDurationSeconds + ) + { + this.queue = queue; + this.exchange = exchange; + this.routingKey = routingKey; + this.durable = durable; + this.exclusive = exclusive; + this.autoDelete = autoDelete; + + this.maxRetries = maxRetries == 0 ? defaultMaxRetries : maxRetries; + this.retryIntervalSeconds = retryIntervalSeconds == 0 ? defaultRetryIntervalSeconds : retryIntervalSeconds; + this.maxDurationSeconds = maxDurationSeconds == 0 ? defaultMaxDurationSeconds : maxDurationSeconds; + } @JsonProperty public String getQueue() @@ -44,90 +78,109 @@ public class RabbitMQFirehoseConfig return queue; } - public void setQueue(String queue) - { - this.queue = queue; - } - @JsonProperty public String getExchange() { return exchange; } - public void setExchange(String exchange) - { - this.exchange = exchange; - } - @JsonProperty public String getRoutingKey() { return routingKey; } - public void setRoutingKey(String routingKey) - { - this.routingKey = routingKey; - } - @JsonProperty public boolean isDurable() { return durable; } - public void setDurable(boolean durable) - { - this.durable = durable; - } - @JsonProperty public boolean isExclusive() { return exclusive; } - public void setExclusive(boolean exclusive) - { - this.exclusive = exclusive; - } - @JsonProperty public boolean isAutoDelete() { return autoDelete; } - public void setAutoDelete(boolean autoDelete) - { - this.autoDelete = autoDelete; - } - @JsonProperty - public int getMaxRetries() { + public int getMaxRetries() + { return maxRetries; } - public void setMaxRetries(int maxRetries) { - this.maxRetries = maxRetries; - } - @JsonProperty - public int getRetryIntervalSeconds() { + public int getRetryIntervalSeconds() + { return retryIntervalSeconds; } - public void setRetryIntervalSeconds(int retryIntervalSeconds) { - this.retryIntervalSeconds = retryIntervalSeconds; - } - @JsonProperty - public long getMaxDurationSeconds() { + public long getMaxDurationSeconds() + { return maxDurationSeconds; } - public void setMaxDurationSeconds(int maxDurationSeconds) { - this.maxDurationSeconds = maxDurationSeconds; + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RabbitMQFirehoseConfig that = (RabbitMQFirehoseConfig) o; + + if (autoDelete != that.autoDelete) { + return false; + } + if (durable != that.durable) { + return false; + } + if (exclusive != that.exclusive) { + return false; + } + if (maxDurationSeconds != that.maxDurationSeconds) { + return false; + } + if (maxRetries != that.maxRetries) { + return false; + } + if (retryIntervalSeconds != that.retryIntervalSeconds) { + return false; + } + if (exchange != null ? !exchange.equals(that.exchange) : that.exchange != null) { + return false; + } + if (queue != null ? !queue.equals(that.queue) : that.queue != null) { + return false; + } + if (routingKey != null ? !routingKey.equals(that.routingKey) : that.routingKey != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = queue != null ? queue.hashCode() : 0; + result = 31 * result + (exchange != null ? exchange.hashCode() : 0); + result = 31 * result + (routingKey != null ? routingKey.hashCode() : 0); + result = 31 * result + (durable ? 1 : 0); + result = 31 * result + (exclusive ? 1 : 0); + result = 31 * result + (autoDelete ? 1 : 0); + result = 31 * result + maxRetries; + result = 31 * result + retryIntervalSeconds; + result = 31 * result + (int) (maxDurationSeconds ^ (maxDurationSeconds >>> 32)); + return result; } } diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index 8504c70ed9d..f38398bcc84 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -26,7 +26,6 @@ import com.metamx.common.logger.Logger; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; @@ -50,14 +49,14 @@ import java.util.concurrent.LinkedBlockingQueue; /** * A FirehoseFactory for RabbitMQ. - * + *

* It will receive it's configuration through the realtime.spec file and expects to find a * consumerProps element in the firehose definition with values for a number of configuration options. * Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options * that have defaults can be skipped but options with no defaults must be specified with the exception * of the URI property. If the URI property is set, it will override any other property that was also * set. - * + *

* File: realtime.spec *

  *   "firehose" : {
@@ -89,7 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  *     }
  *   },
  * 
- * + *

* Limitations: This implementation will not attempt to reconnect to the MQ broker if the * connection to it is lost. Furthermore it does not support any automatic failover on high availability * RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior @@ -97,7 +96,7 @@ import java.util.concurrent.LinkedBlockingQueue; * the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this * Firehose connects to, messages should survive an MQ broker node failure and be delivered once a * connection to another node is set up. - * + *

* For more information on RabbitMQ high availability please see: * http://www.rabbitmq.com/ha.html. */ @@ -105,27 +104,36 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory _queue; - public QueueingConsumer(Channel ch) { + public QueueingConsumer(Channel ch) + { this(ch, new LinkedBlockingQueue()); } - public QueueingConsumer(Channel ch, BlockingQueue q) { + public QueueingConsumer(Channel ch, BlockingQueue q) + { super(ch); this._queue = q; } - @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) + { _queue.clear(); } - @Override public void handleCancel(String consumerTag) throws IOException { + @Override + public void handleCancel(String consumerTag) throws IOException + { _queue.clear(); } - @Override public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) - throws IOException + @Override + public void handleDelivery( + String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body + ) + throws IOException { this._queue.add(new Delivery(envelope, properties, body)); } public Delivery nextDelivery() - throws InterruptedException, ShutdownSignalException, ConsumerCancelledException + throws InterruptedException, ShutdownSignalException, ConsumerCancelledException { return _queue.take(); } diff --git a/rabbitmq/src/test/java/io/druid/examples/rabbitmq/RabbitMQFirehoseFactoryTest.java b/rabbitmq/src/test/java/io/druid/examples/rabbitmq/RabbitMQFirehoseFactoryTest.java new file mode 100644 index 00000000000..eca04717fc2 --- /dev/null +++ b/rabbitmq/src/test/java/io/druid/examples/rabbitmq/RabbitMQFirehoseFactoryTest.java @@ -0,0 +1,139 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.examples.rabbitmq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.rabbitmq.client.ConnectionFactory; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.firehose.rabbitmq.JacksonifiedConnectionFactory; +import io.druid.firehose.rabbitmq.RabbitMQFirehoseConfig; +import io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +/** + */ +public class RabbitMQFirehoseFactoryTest +{ + private static final ObjectMapper mapper = new DefaultObjectMapper(); + + @Test + public void testSerde() throws Exception + { + RabbitMQFirehoseConfig config = new RabbitMQFirehoseConfig( + "test", + "test2", + "test3", + true, + true, + true, + 5, + 10, + 20 + ); + + JacksonifiedConnectionFactory connectionFactory = new JacksonifiedConnectionFactory( + "foo", + 9978, + "user", + "pw", + "host", + null, + 5, + 10, + 11, + 12, + ImmutableMap.of("hi", "bye") + ); + + RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory( + connectionFactory, + config, + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "auto"), + new DimensionsSpec( + Arrays.asList("dim"), + Lists.newArrayList(), + Lists.newArrayList() + ) + ), + null, null, null, null + ) + ); + + byte[] bytes = mapper.writeValueAsBytes(factory); + RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class); + byte[] bytes2 = mapper.writeValueAsBytes(factory2); + + Assert.assertArrayEquals(bytes, bytes2); + + Assert.assertEquals(factory.getConfig(), factory2.getConfig()); + Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory()); + } + + @Test + public void testDefaultSerde() throws Exception + { + RabbitMQFirehoseConfig config = RabbitMQFirehoseConfig.makeDefaultConfig(); + + JacksonifiedConnectionFactory connectionFactory = JacksonifiedConnectionFactory.makeDefaultConnectionFactory(); + + RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory( + connectionFactory, + config, + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "auto"), + new DimensionsSpec( + Arrays.asList("dim"), + Lists.newArrayList(), + Lists.newArrayList() + ) + ), + null, null, null, null + ) + ); + + byte[] bytes = mapper.writeValueAsBytes(factory); + RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class); + byte[] bytes2 = mapper.writeValueAsBytes(factory2); + + Assert.assertArrayEquals(bytes, bytes2); + + Assert.assertEquals(factory.getConfig(), factory2.getConfig()); + Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory()); + + Assert.assertEquals(300, factory2.getConfig().getMaxDurationSeconds()); + + Assert.assertEquals(ConnectionFactory.DEFAULT_HOST, factory2.getConnectionFactory().getHost()); + Assert.assertEquals(ConnectionFactory.DEFAULT_USER, factory2.getConnectionFactory().getUsername()); + Assert.assertEquals(ConnectionFactory.DEFAULT_AMQP_PORT, factory2.getConnectionFactory().getPort()); + } +}