mirror of https://github.com/apache/druid.git
Merge pull request #795 from metamx/fix-rabbit
Fix rabbit serde and add tests; fixes #794
This commit is contained in:
commit
ca978d62f6
|
@ -1,5 +1,6 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>io.druid.extensions</groupId>
|
<groupId>io.druid.extensions</groupId>
|
||||||
<artifactId>druid-rabbitmq</artifactId>
|
<artifactId>druid-rabbitmq</artifactId>
|
||||||
|
@ -39,5 +40,11 @@
|
||||||
<artifactId>commons-cli</artifactId>
|
<artifactId>commons-cli</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-processing</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
|
@ -19,12 +19,12 @@
|
||||||
|
|
||||||
package io.druid.firehose.rabbitmq;
|
package io.druid.firehose.rabbitmq;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import com.rabbitmq.client.ConnectionFactory;
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
import com.rabbitmq.client.LongString;
|
||||||
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.security.KeyManagementException;
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,140 +33,229 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public class JacksonifiedConnectionFactory extends ConnectionFactory
|
public class JacksonifiedConnectionFactory extends ConnectionFactory
|
||||||
{
|
{
|
||||||
|
public static JacksonifiedConnectionFactory makeDefaultConnectionFactory() throws Exception
|
||||||
|
{
|
||||||
|
return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> getSerializableClientProperties(final Map<String, Object> clientProperties)
|
||||||
|
{
|
||||||
|
return Maps.transformEntries(
|
||||||
|
clientProperties,
|
||||||
|
new Maps.EntryTransformer<String, Object, Object>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Object transformEntry(String key, Object value)
|
||||||
|
{
|
||||||
|
if (value instanceof LongString) {
|
||||||
|
return value.toString();
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String host;
|
||||||
|
private final int port;
|
||||||
|
private final String username;
|
||||||
|
private final String password;
|
||||||
|
private final String virtualHost;
|
||||||
|
private final String uri;
|
||||||
|
private final int requestedChannelMax;
|
||||||
|
private final int requestedFrameMax;
|
||||||
|
private final int requestedHeartbeat;
|
||||||
|
private final int connectionTimeout;
|
||||||
|
private final Map<String, Object> clientProperties;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public JacksonifiedConnectionFactory(
|
||||||
|
@JsonProperty("host") String host,
|
||||||
|
@JsonProperty("port") int port,
|
||||||
|
@JsonProperty("username") String username,
|
||||||
|
@JsonProperty("password") String password,
|
||||||
|
@JsonProperty("virtualHost") String virtualHost,
|
||||||
|
@JsonProperty("uri") String uri,
|
||||||
|
@JsonProperty("requestedChannelMax") int requestedChannelMax,
|
||||||
|
@JsonProperty("requestedFrameMax") int requestedFrameMax,
|
||||||
|
@JsonProperty("requestedHeartbeat") int requestedHeartbeat,
|
||||||
|
@JsonProperty("connectionTimeout") int connectionTimeout,
|
||||||
|
@JsonProperty("clientProperties") Map<String, Object> clientProperties
|
||||||
|
) throws Exception
|
||||||
|
{
|
||||||
|
super();
|
||||||
|
|
||||||
|
this.host = host == null ? super.getHost() : host;
|
||||||
|
this.port = port == 0 ? super.getPort() : port;
|
||||||
|
this.username = username == null ? super.getUsername() : username;
|
||||||
|
this.password = password == null ? super.getPassword() : password;
|
||||||
|
this.virtualHost = virtualHost == null ? super.getVirtualHost() : virtualHost;
|
||||||
|
this.uri = uri;
|
||||||
|
this.requestedChannelMax = requestedChannelMax == 0 ? super.getRequestedChannelMax() : requestedChannelMax;
|
||||||
|
this.requestedFrameMax = requestedFrameMax == 0 ? super.getRequestedFrameMax() : requestedFrameMax;
|
||||||
|
this.requestedHeartbeat = requestedHeartbeat == 0 ? super.getRequestedHeartbeat() : requestedHeartbeat;
|
||||||
|
this.connectionTimeout = connectionTimeout == 0 ? super.getConnectionTimeout() : connectionTimeout;
|
||||||
|
this.clientProperties = clientProperties == null ? super.getClientProperties() : clientProperties;
|
||||||
|
|
||||||
|
super.setHost(this.host);
|
||||||
|
super.setPort(this.port);
|
||||||
|
super.setUsername(this.username);
|
||||||
|
super.setPassword(this.password);
|
||||||
|
super.setVirtualHost(this.virtualHost);
|
||||||
|
if (this.uri != null) {
|
||||||
|
super.setUri(this.uri);
|
||||||
|
}
|
||||||
|
super.setRequestedChannelMax(this.requestedChannelMax);
|
||||||
|
super.setRequestedFrameMax(this.requestedFrameMax);
|
||||||
|
super.setRequestedHeartbeat(this.requestedHeartbeat);
|
||||||
|
super.setConnectionTimeout(this.connectionTimeout);
|
||||||
|
super.setClientProperties(this.clientProperties);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getHost()
|
public String getHost()
|
||||||
{
|
{
|
||||||
return super.getHost();
|
return host;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setHost(String host)
|
|
||||||
{
|
|
||||||
super.setHost(host);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getPort()
|
public int getPort()
|
||||||
{
|
{
|
||||||
return super.getPort();
|
return port;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPort(int port)
|
|
||||||
{
|
|
||||||
super.setPort(port);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getUsername()
|
public String getUsername()
|
||||||
{
|
{
|
||||||
return super.getUsername();
|
return username;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setUsername(String username)
|
|
||||||
{
|
|
||||||
super.setUsername(username);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getPassword()
|
public String getPassword()
|
||||||
{
|
{
|
||||||
return super.getPassword();
|
return password;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPassword(String password)
|
|
||||||
{
|
|
||||||
super.setPassword(password);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getVirtualHost()
|
public String getVirtualHost()
|
||||||
{
|
{
|
||||||
return super.getVirtualHost();
|
return virtualHost;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setVirtualHost(String virtualHost)
|
|
||||||
{
|
|
||||||
super.setVirtualHost(virtualHost);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
|
public String getUri()
|
||||||
{
|
{
|
||||||
super.setUri(uriString);
|
return uri;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getRequestedChannelMax()
|
public int getRequestedChannelMax()
|
||||||
{
|
{
|
||||||
return super.getRequestedChannelMax();
|
return requestedChannelMax;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRequestedChannelMax(int requestedChannelMax)
|
|
||||||
{
|
|
||||||
super.setRequestedChannelMax(requestedChannelMax);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getRequestedFrameMax()
|
public int getRequestedFrameMax()
|
||||||
{
|
{
|
||||||
return super.getRequestedFrameMax();
|
return requestedFrameMax;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRequestedFrameMax(int requestedFrameMax)
|
|
||||||
{
|
|
||||||
super.setRequestedFrameMax(requestedFrameMax);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getRequestedHeartbeat()
|
public int getRequestedHeartbeat()
|
||||||
{
|
{
|
||||||
return super.getRequestedHeartbeat();
|
return requestedHeartbeat;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setConnectionTimeout(int connectionTimeout)
|
|
||||||
{
|
|
||||||
super.setConnectionTimeout(connectionTimeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getConnectionTimeout()
|
public int getConnectionTimeout()
|
||||||
{
|
{
|
||||||
return super.getConnectionTimeout();
|
return connectionTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("clientProperties")
|
||||||
|
public Map<String, Object> getSerializableClientProperties()
|
||||||
|
{
|
||||||
|
return getSerializableClientProperties(clientProperties);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRequestedHeartbeat(int requestedHeartbeat)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
super.setRequestedHeartbeat(requestedHeartbeat);
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
JacksonifiedConnectionFactory that = (JacksonifiedConnectionFactory) o;
|
||||||
|
|
||||||
|
if (connectionTimeout != that.connectionTimeout) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (port != that.port) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (requestedChannelMax != that.requestedChannelMax) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (requestedFrameMax != that.requestedFrameMax) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (requestedHeartbeat != that.requestedHeartbeat) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (clientProperties != null
|
||||||
|
? !Maps.difference(
|
||||||
|
getSerializableClientProperties(clientProperties),
|
||||||
|
getSerializableClientProperties(that.clientProperties)
|
||||||
|
).areEqual()
|
||||||
|
: that.clientProperties != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (host != null ? !host.equals(that.host) : that.host != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (password != null ? !password.equals(that.password) : that.password != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (username != null ? !username.equals(that.username) : that.username != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (virtualHost != null ? !virtualHost.equals(that.virtualHost) : that.virtualHost != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
public int hashCode()
|
||||||
public Map<String, Object> getClientProperties()
|
|
||||||
{
|
{
|
||||||
return super.getClientProperties();
|
int result = host != null ? host.hashCode() : 0;
|
||||||
}
|
result = 31 * result + port;
|
||||||
|
result = 31 * result + (username != null ? username.hashCode() : 0);
|
||||||
@Override
|
result = 31 * result + (password != null ? password.hashCode() : 0);
|
||||||
public void setClientProperties(Map<String, Object> clientProperties)
|
result = 31 * result + (virtualHost != null ? virtualHost.hashCode() : 0);
|
||||||
{
|
result = 31 * result + (uri != null ? uri.hashCode() : 0);
|
||||||
super.setClientProperties(clientProperties);
|
result = 31 * result + requestedChannelMax;
|
||||||
|
result = 31 * result + requestedFrameMax;
|
||||||
|
result = 31 * result + requestedHeartbeat;
|
||||||
|
result = 31 * result + connectionTimeout;
|
||||||
|
result = 31 * result + (clientProperties != null ? clientProperties.hashCode() : 0);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class RabbitMQDruidModule implements DruidModule
|
public class RabbitMQDruidModule implements DruidModule
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public List<? extends Module> getJacksonModules()
|
public List<? extends Module> getJacksonModules()
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.firehose.rabbitmq;
|
package io.druid.firehose.rabbitmq;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -26,17 +27,50 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
*/
|
*/
|
||||||
public class RabbitMQFirehoseConfig
|
public class RabbitMQFirehoseConfig
|
||||||
{
|
{
|
||||||
private String queue = null;
|
|
||||||
private String exchange = null;
|
|
||||||
private String routingKey = null;
|
|
||||||
private boolean durable = false;
|
|
||||||
private boolean exclusive = false;
|
|
||||||
private boolean autoDelete = false;
|
|
||||||
|
|
||||||
// Lyra (auto reconnect) properties
|
// Lyra (auto reconnect) properties
|
||||||
private int maxRetries = 100;
|
private static final int defaultMaxRetries = 100;
|
||||||
private int retryIntervalSeconds = 2;
|
private static final int defaultRetryIntervalSeconds = 2;
|
||||||
private long maxDurationSeconds = 5 * 60;
|
private static final long defaultMaxDurationSeconds = 5 * 60;
|
||||||
|
|
||||||
|
public static RabbitMQFirehoseConfig makeDefaultConfig()
|
||||||
|
{
|
||||||
|
return new RabbitMQFirehoseConfig(null, null, null, false, false, false, 0, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String queue;
|
||||||
|
private final String exchange;
|
||||||
|
private final String routingKey;
|
||||||
|
private final boolean durable;
|
||||||
|
private final boolean exclusive;
|
||||||
|
private final boolean autoDelete;
|
||||||
|
private final int maxRetries;
|
||||||
|
private final int retryIntervalSeconds;
|
||||||
|
private final long maxDurationSeconds;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public RabbitMQFirehoseConfig(
|
||||||
|
@JsonProperty("queue") String queue,
|
||||||
|
@JsonProperty("exchange") String exchange,
|
||||||
|
@JsonProperty("routingKey") String routingKey,
|
||||||
|
@JsonProperty("durable") boolean durable,
|
||||||
|
@JsonProperty("exclusive") boolean exclusive,
|
||||||
|
@JsonProperty("autoDelete") boolean autoDelete,
|
||||||
|
@JsonProperty("maxRetries") int maxRetries,
|
||||||
|
@JsonProperty("retryIntervalSeconds") int retryIntervalSeconds,
|
||||||
|
@JsonProperty("maxDurationSeconds") long maxDurationSeconds
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.queue = queue;
|
||||||
|
this.exchange = exchange;
|
||||||
|
this.routingKey = routingKey;
|
||||||
|
this.durable = durable;
|
||||||
|
this.exclusive = exclusive;
|
||||||
|
this.autoDelete = autoDelete;
|
||||||
|
|
||||||
|
this.maxRetries = maxRetries == 0 ? defaultMaxRetries : maxRetries;
|
||||||
|
this.retryIntervalSeconds = retryIntervalSeconds == 0 ? defaultRetryIntervalSeconds : retryIntervalSeconds;
|
||||||
|
this.maxDurationSeconds = maxDurationSeconds == 0 ? defaultMaxDurationSeconds : maxDurationSeconds;
|
||||||
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getQueue()
|
public String getQueue()
|
||||||
|
@ -44,90 +78,109 @@ public class RabbitMQFirehoseConfig
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setQueue(String queue)
|
|
||||||
{
|
|
||||||
this.queue = queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getExchange()
|
public String getExchange()
|
||||||
{
|
{
|
||||||
return exchange;
|
return exchange;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setExchange(String exchange)
|
|
||||||
{
|
|
||||||
this.exchange = exchange;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getRoutingKey()
|
public String getRoutingKey()
|
||||||
{
|
{
|
||||||
return routingKey;
|
return routingKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRoutingKey(String routingKey)
|
|
||||||
{
|
|
||||||
this.routingKey = routingKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public boolean isDurable()
|
public boolean isDurable()
|
||||||
{
|
{
|
||||||
return durable;
|
return durable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDurable(boolean durable)
|
|
||||||
{
|
|
||||||
this.durable = durable;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public boolean isExclusive()
|
public boolean isExclusive()
|
||||||
{
|
{
|
||||||
return exclusive;
|
return exclusive;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setExclusive(boolean exclusive)
|
|
||||||
{
|
|
||||||
this.exclusive = exclusive;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public boolean isAutoDelete()
|
public boolean isAutoDelete()
|
||||||
{
|
{
|
||||||
return autoDelete;
|
return autoDelete;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAutoDelete(boolean autoDelete)
|
|
||||||
{
|
|
||||||
this.autoDelete = autoDelete;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getMaxRetries() {
|
public int getMaxRetries()
|
||||||
|
{
|
||||||
return maxRetries;
|
return maxRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMaxRetries(int maxRetries) {
|
|
||||||
this.maxRetries = maxRetries;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getRetryIntervalSeconds() {
|
public int getRetryIntervalSeconds()
|
||||||
|
{
|
||||||
return retryIntervalSeconds;
|
return retryIntervalSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRetryIntervalSeconds(int retryIntervalSeconds) {
|
|
||||||
this.retryIntervalSeconds = retryIntervalSeconds;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public long getMaxDurationSeconds() {
|
public long getMaxDurationSeconds()
|
||||||
|
{
|
||||||
return maxDurationSeconds;
|
return maxDurationSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMaxDurationSeconds(int maxDurationSeconds) {
|
@Override
|
||||||
this.maxDurationSeconds = maxDurationSeconds;
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
RabbitMQFirehoseConfig that = (RabbitMQFirehoseConfig) o;
|
||||||
|
|
||||||
|
if (autoDelete != that.autoDelete) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (durable != that.durable) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (exclusive != that.exclusive) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (maxDurationSeconds != that.maxDurationSeconds) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (maxRetries != that.maxRetries) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (retryIntervalSeconds != that.retryIntervalSeconds) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (exchange != null ? !exchange.equals(that.exchange) : that.exchange != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (queue != null ? !queue.equals(that.queue) : that.queue != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (routingKey != null ? !routingKey.equals(that.routingKey) : that.routingKey != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = queue != null ? queue.hashCode() : 0;
|
||||||
|
result = 31 * result + (exchange != null ? exchange.hashCode() : 0);
|
||||||
|
result = 31 * result + (routingKey != null ? routingKey.hashCode() : 0);
|
||||||
|
result = 31 * result + (durable ? 1 : 0);
|
||||||
|
result = 31 * result + (exclusive ? 1 : 0);
|
||||||
|
result = 31 * result + (autoDelete ? 1 : 0);
|
||||||
|
result = 31 * result + maxRetries;
|
||||||
|
result = 31 * result + retryIntervalSeconds;
|
||||||
|
result = 31 * result + (int) (maxDurationSeconds ^ (maxDurationSeconds >>> 32));
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import com.metamx.common.logger.Logger;
|
||||||
import com.rabbitmq.client.AMQP;
|
import com.rabbitmq.client.AMQP;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import com.rabbitmq.client.Connection;
|
import com.rabbitmq.client.Connection;
|
||||||
import com.rabbitmq.client.ConnectionFactory;
|
|
||||||
import com.rabbitmq.client.ConsumerCancelledException;
|
import com.rabbitmq.client.ConsumerCancelledException;
|
||||||
import com.rabbitmq.client.DefaultConsumer;
|
import com.rabbitmq.client.DefaultConsumer;
|
||||||
import com.rabbitmq.client.Envelope;
|
import com.rabbitmq.client.Envelope;
|
||||||
|
@ -50,14 +49,14 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A FirehoseFactory for RabbitMQ.
|
* A FirehoseFactory for RabbitMQ.
|
||||||
*
|
* <p/>
|
||||||
* It will receive it's configuration through the realtime.spec file and expects to find a
|
* It will receive it's configuration through the realtime.spec file and expects to find a
|
||||||
* consumerProps element in the firehose definition with values for a number of configuration options.
|
* consumerProps element in the firehose definition with values for a number of configuration options.
|
||||||
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
|
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
|
||||||
* that have defaults can be skipped but options with no defaults must be specified with the exception
|
* that have defaults can be skipped but options with no defaults must be specified with the exception
|
||||||
* of the URI property. If the URI property is set, it will override any other property that was also
|
* of the URI property. If the URI property is set, it will override any other property that was also
|
||||||
* set.
|
* set.
|
||||||
*
|
* <p/>
|
||||||
* File: <em>realtime.spec</em>
|
* File: <em>realtime.spec</em>
|
||||||
* <pre>
|
* <pre>
|
||||||
* "firehose" : {
|
* "firehose" : {
|
||||||
|
@ -89,7 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
* }
|
* }
|
||||||
* },
|
* },
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
* <p/>
|
||||||
* <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the
|
* <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the
|
||||||
* connection to it is lost. Furthermore it does not support any automatic failover on high availability
|
* connection to it is lost. Furthermore it does not support any automatic failover on high availability
|
||||||
* RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior
|
* RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior
|
||||||
|
@ -97,7 +96,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
* the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this
|
* the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this
|
||||||
* Firehose connects to, messages should survive an MQ broker node failure and be delivered once a
|
* Firehose connects to, messages should survive an MQ broker node failure and be delivered once a
|
||||||
* connection to another node is set up.
|
* connection to another node is set up.
|
||||||
*
|
* <p/>
|
||||||
* For more information on RabbitMQ high availability please see:
|
* For more information on RabbitMQ high availability please see:
|
||||||
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
|
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
|
||||||
*/
|
*/
|
||||||
|
@ -105,27 +104,36 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
|
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
private final RabbitMQFirehoseConfig config;
|
private final RabbitMQFirehoseConfig config;
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
private final StringInputRowParser parser;
|
private final StringInputRowParser parser;
|
||||||
|
private final JacksonifiedConnectionFactory connectionFactory;
|
||||||
@JsonProperty
|
|
||||||
private final ConnectionFactory connectionFactory;
|
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public RabbitMQFirehoseFactory(
|
public RabbitMQFirehoseFactory(
|
||||||
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
|
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
|
||||||
@JsonProperty("config") RabbitMQFirehoseConfig config,
|
@JsonProperty("config") RabbitMQFirehoseConfig config,
|
||||||
@JsonProperty("parser") StringInputRowParser parser
|
@JsonProperty("parser") StringInputRowParser parser
|
||||||
)
|
) throws Exception
|
||||||
{
|
{
|
||||||
this.connectionFactory = connectionFactory;
|
this.connectionFactory = connectionFactory == null
|
||||||
this.config = config;
|
? JacksonifiedConnectionFactory.makeDefaultConnectionFactory()
|
||||||
|
: connectionFactory;
|
||||||
|
this.config = config == null ? RabbitMQFirehoseConfig.makeDefaultConfig() : config;
|
||||||
this.parser = parser;
|
this.parser = parser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public RabbitMQFirehoseConfig getConfig()
|
||||||
|
{
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public JacksonifiedConnectionFactory getConnectionFactory()
|
||||||
|
{
|
||||||
|
return connectionFactory;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
|
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
|
||||||
{
|
{
|
||||||
|
@ -270,6 +278,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
@Override
|
@Override
|
||||||
public ByteBufferInputRowParser getParser()
|
public ByteBufferInputRowParser getParser()
|
||||||
{
|
{
|
||||||
|
@ -280,34 +289,43 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
||||||
{
|
{
|
||||||
private final BlockingQueue<Delivery> _queue;
|
private final BlockingQueue<Delivery> _queue;
|
||||||
|
|
||||||
public QueueingConsumer(Channel ch) {
|
public QueueingConsumer(Channel ch)
|
||||||
|
{
|
||||||
this(ch, new LinkedBlockingQueue<Delivery>());
|
this(ch, new LinkedBlockingQueue<Delivery>());
|
||||||
}
|
}
|
||||||
|
|
||||||
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
|
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q)
|
||||||
|
{
|
||||||
super(ch);
|
super(ch);
|
||||||
this._queue = q;
|
this._queue = q;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
|
@Override
|
||||||
|
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig)
|
||||||
|
{
|
||||||
_queue.clear();
|
_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void handleCancel(String consumerTag) throws IOException {
|
@Override
|
||||||
|
public void handleCancel(String consumerTag) throws IOException
|
||||||
|
{
|
||||||
_queue.clear();
|
_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void handleDelivery(String consumerTag,
|
@Override
|
||||||
Envelope envelope,
|
public void handleDelivery(
|
||||||
AMQP.BasicProperties properties,
|
String consumerTag,
|
||||||
byte[] body)
|
Envelope envelope,
|
||||||
throws IOException
|
AMQP.BasicProperties properties,
|
||||||
|
byte[] body
|
||||||
|
)
|
||||||
|
throws IOException
|
||||||
{
|
{
|
||||||
this._queue.add(new Delivery(envelope, properties, body));
|
this._queue.add(new Delivery(envelope, properties, body));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Delivery nextDelivery()
|
public Delivery nextDelivery()
|
||||||
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
|
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
|
||||||
{
|
{
|
||||||
return _queue.take();
|
return _queue.take();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,139 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.examples.rabbitmq;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
|
import io.druid.data.input.impl.JSONParseSpec;
|
||||||
|
import io.druid.data.input.impl.SpatialDimensionSchema;
|
||||||
|
import io.druid.data.input.impl.StringInputRowParser;
|
||||||
|
import io.druid.data.input.impl.TimestampSpec;
|
||||||
|
import io.druid.firehose.rabbitmq.JacksonifiedConnectionFactory;
|
||||||
|
import io.druid.firehose.rabbitmq.RabbitMQFirehoseConfig;
|
||||||
|
import io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class RabbitMQFirehoseFactoryTest
|
||||||
|
{
|
||||||
|
private static final ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerde() throws Exception
|
||||||
|
{
|
||||||
|
RabbitMQFirehoseConfig config = new RabbitMQFirehoseConfig(
|
||||||
|
"test",
|
||||||
|
"test2",
|
||||||
|
"test3",
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
5,
|
||||||
|
10,
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
JacksonifiedConnectionFactory connectionFactory = new JacksonifiedConnectionFactory(
|
||||||
|
"foo",
|
||||||
|
9978,
|
||||||
|
"user",
|
||||||
|
"pw",
|
||||||
|
"host",
|
||||||
|
null,
|
||||||
|
5,
|
||||||
|
10,
|
||||||
|
11,
|
||||||
|
12,
|
||||||
|
ImmutableMap.<String, Object>of("hi", "bye")
|
||||||
|
);
|
||||||
|
|
||||||
|
RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory(
|
||||||
|
connectionFactory,
|
||||||
|
config,
|
||||||
|
new StringInputRowParser(
|
||||||
|
new JSONParseSpec(
|
||||||
|
new TimestampSpec("timestamp", "auto"),
|
||||||
|
new DimensionsSpec(
|
||||||
|
Arrays.asList("dim"),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
Lists.<SpatialDimensionSchema>newArrayList()
|
||||||
|
)
|
||||||
|
),
|
||||||
|
null, null, null, null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
byte[] bytes = mapper.writeValueAsBytes(factory);
|
||||||
|
RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class);
|
||||||
|
byte[] bytes2 = mapper.writeValueAsBytes(factory2);
|
||||||
|
|
||||||
|
Assert.assertArrayEquals(bytes, bytes2);
|
||||||
|
|
||||||
|
Assert.assertEquals(factory.getConfig(), factory2.getConfig());
|
||||||
|
Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultSerde() throws Exception
|
||||||
|
{
|
||||||
|
RabbitMQFirehoseConfig config = RabbitMQFirehoseConfig.makeDefaultConfig();
|
||||||
|
|
||||||
|
JacksonifiedConnectionFactory connectionFactory = JacksonifiedConnectionFactory.makeDefaultConnectionFactory();
|
||||||
|
|
||||||
|
RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory(
|
||||||
|
connectionFactory,
|
||||||
|
config,
|
||||||
|
new StringInputRowParser(
|
||||||
|
new JSONParseSpec(
|
||||||
|
new TimestampSpec("timestamp", "auto"),
|
||||||
|
new DimensionsSpec(
|
||||||
|
Arrays.asList("dim"),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
Lists.<SpatialDimensionSchema>newArrayList()
|
||||||
|
)
|
||||||
|
),
|
||||||
|
null, null, null, null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
byte[] bytes = mapper.writeValueAsBytes(factory);
|
||||||
|
RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class);
|
||||||
|
byte[] bytes2 = mapper.writeValueAsBytes(factory2);
|
||||||
|
|
||||||
|
Assert.assertArrayEquals(bytes, bytes2);
|
||||||
|
|
||||||
|
Assert.assertEquals(factory.getConfig(), factory2.getConfig());
|
||||||
|
Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory());
|
||||||
|
|
||||||
|
Assert.assertEquals(300, factory2.getConfig().getMaxDurationSeconds());
|
||||||
|
|
||||||
|
Assert.assertEquals(ConnectionFactory.DEFAULT_HOST, factory2.getConnectionFactory().getHost());
|
||||||
|
Assert.assertEquals(ConnectionFactory.DEFAULT_USER, factory2.getConnectionFactory().getUsername());
|
||||||
|
Assert.assertEquals(ConnectionFactory.DEFAULT_AMQP_PORT, factory2.getConnectionFactory().getPort());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue