address cr

This commit is contained in:
fjy 2014-10-17 12:32:37 -04:00
parent 3d09edaae6
commit bf3d31e5cc
4 changed files with 272 additions and 133 deletions

View File

@ -19,14 +19,11 @@
package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.rabbitmq.client.ConnectionFactory;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
/**
@ -35,133 +32,137 @@ import java.util.Map;
*/
public class JacksonifiedConnectionFactory extends ConnectionFactory
{
public static JacksonifiedConnectionFactory makeDefaultConnectionFactory() throws Exception
{
return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null);
}
private final String host;
private final int port;
private final String username;
private final String password;
private final String virtualHost;
private final String uri;
private final int requestedChannelMax;
private final int requestedFrameMax;
private final int requestedHeartbeat;
private final int connectionTimeout;
private final Map<String, Object> clientProperties;
@JsonCreator
public JacksonifiedConnectionFactory(
@JsonProperty("host") String host,
@JsonProperty("port") int port,
@JsonProperty("username") String username,
@JsonProperty("password") String password,
@JsonProperty("virtualHost") String virtualHost,
@JsonProperty("uri") String uri,
@JsonProperty("requestedChannelMax") int requestedChannelMax,
@JsonProperty("requestedFrameMax") int requestedFrameMax,
@JsonProperty("requestedHeartbeat") int requestedHeartbeat,
@JsonProperty("connectionTimeout") int connectionTimeout,
@JsonProperty("clientProperties") Map<String, Object> clientProperties
) throws Exception
{
this.host = host == null ? super.getHost() : host;
this.port = port == 0 ? super.getPort() : port;
this.username = username == null ? super.getUsername() : username;
this.password = password == null ? super.getPassword() : password;
this.virtualHost = virtualHost == null ? super.getVirtualHost() : virtualHost;
this.uri = uri;
this.requestedChannelMax = requestedChannelMax == 0 ? super.getRequestedChannelMax() : requestedChannelMax;
this.requestedFrameMax = requestedFrameMax == 0 ? super.getRequestedFrameMax() : requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat == 0 ? super.getRequestedHeartbeat() : requestedHeartbeat;
this.connectionTimeout = connectionTimeout == 0 ? super.getConnectionTimeout() : connectionTimeout;
this.clientProperties = clientProperties == null ? super.getClientProperties() : clientProperties;
super.setHost(this.host);
super.setPort(this.port);
super.setUsername(this.username);
super.setPassword(this.password);
super.setVirtualHost(this.virtualHost);
if (this.uri != null) {
super.setUri(this.uri);
}
super.setRequestedChannelMax(this.requestedChannelMax);
super.setRequestedFrameMax(this.requestedFrameMax);
super.setRequestedHeartbeat(this.requestedHeartbeat);
super.setConnectionTimeout(this.connectionTimeout);
super.setClientProperties(this.clientProperties);
}
@Override
@JsonProperty
public String getHost()
{
return super.getHost();
}
@Override
public void setHost(String host)
{
super.setHost(host);
return host;
}
@Override
@JsonProperty
public int getPort()
{
return super.getPort();
return port;
}
@Override
public void setPort(int port)
{
super.setPort(port);
}
@Override
@JsonProperty
public String getUsername()
{
return super.getUsername();
}
@Override
public void setUsername(String username)
{
super.setUsername(username);
return username;
}
@Override
@JsonProperty
public String getPassword()
{
return super.getPassword();
}
@Override
public void setPassword(String password)
{
super.setPassword(password);
return password;
}
@Override
@JsonProperty
public String getVirtualHost()
{
return super.getVirtualHost();
return virtualHost;
}
@Override
public void setVirtualHost(String virtualHost)
{
super.setVirtualHost(virtualHost);
}
@Override
@JsonProperty
public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
public String getUri()
{
super.setUri(uriString);
return uri;
}
@Override
@JsonProperty
public int getRequestedChannelMax()
{
return super.getRequestedChannelMax();
}
@Override
public void setRequestedChannelMax(int requestedChannelMax)
{
super.setRequestedChannelMax(requestedChannelMax);
return requestedChannelMax;
}
@Override
@JsonProperty
public int getRequestedFrameMax()
{
return super.getRequestedFrameMax();
}
@Override
public void setRequestedFrameMax(int requestedFrameMax)
{
super.setRequestedFrameMax(requestedFrameMax);
return requestedFrameMax;
}
@Override
@JsonProperty
public int getRequestedHeartbeat()
{
return super.getRequestedHeartbeat();
}
@Override
public void setConnectionTimeout(int connectionTimeout)
{
super.setConnectionTimeout(connectionTimeout);
return requestedHeartbeat;
}
@Override
@JsonProperty
public int getConnectionTimeout()
{
return super.getConnectionTimeout();
return connectionTimeout;
}
@Override
public void setRequestedHeartbeat(int requestedHeartbeat)
{
super.setRequestedHeartbeat(requestedHeartbeat);
}
@Override
@JsonProperty
public Map<String, Object> getClientProperties()
@JsonProperty("clientProperties")
public Map<String, Object> getClientPropertiesForSerde()
{
return Maps.transformEntries(
super.getClientProperties(),
@ -177,8 +178,68 @@ public class JacksonifiedConnectionFactory extends ConnectionFactory
}
@Override
public void setClientProperties(Map<String, Object> clientProperties)
public boolean equals(Object o)
{
super.setClientProperties(clientProperties);
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JacksonifiedConnectionFactory that = (JacksonifiedConnectionFactory) o;
if (connectionTimeout != that.connectionTimeout) {
return false;
}
if (port != that.port) {
return false;
}
if (requestedChannelMax != that.requestedChannelMax) {
return false;
}
if (requestedFrameMax != that.requestedFrameMax) {
return false;
}
if (requestedHeartbeat != that.requestedHeartbeat) {
return false;
}
if (clientProperties != null ? !clientProperties.equals(that.clientProperties) : that.clientProperties != null) {
return false;
}
if (host != null ? !host.equals(that.host) : that.host != null) {
return false;
}
if (password != null ? !password.equals(that.password) : that.password != null) {
return false;
}
if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
return false;
}
if (username != null ? !username.equals(that.username) : that.username != null) {
return false;
}
if (virtualHost != null ? !virtualHost.equals(that.virtualHost) : that.virtualHost != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = host != null ? host.hashCode() : 0;
result = 31 * result + port;
result = 31 * result + (username != null ? username.hashCode() : 0);
result = 31 * result + (password != null ? password.hashCode() : 0);
result = 31 * result + (virtualHost != null ? virtualHost.hashCode() : 0);
result = 31 * result + (uri != null ? uri.hashCode() : 0);
result = 31 * result + requestedChannelMax;
result = 31 * result + requestedFrameMax;
result = 31 * result + requestedHeartbeat;
result = 31 * result + connectionTimeout;
result = 31 * result + (clientProperties != null ? clientProperties.hashCode() : 0);
return result;
}
}

View File

@ -19,6 +19,7 @@
package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
@ -26,17 +27,50 @@ import com.fasterxml.jackson.annotation.JsonProperty;
*/
public class RabbitMQFirehoseConfig
{
private String queue = null;
private String exchange = null;
private String routingKey = null;
private boolean durable = false;
private boolean exclusive = false;
private boolean autoDelete = false;
// Lyra (auto reconnect) properties
private int maxRetries = 100;
private int retryIntervalSeconds = 2;
private long maxDurationSeconds = 5 * 60;
private static final int defaultMaxRetries = 100;
private static final int defaultRetryIntervalSeconds = 2;
private static final long defaultMaxDurationSeconds = 5 * 60;
public static RabbitMQFirehoseConfig makeDefaultConfig()
{
return new RabbitMQFirehoseConfig(null, null, null, false, false, false, 0, 0, 0);
}
private final String queue;
private final String exchange;
private final String routingKey;
private final boolean durable;
private final boolean exclusive;
private final boolean autoDelete;
private final int maxRetries;
private final int retryIntervalSeconds;
private final long maxDurationSeconds;
@JsonCreator
public RabbitMQFirehoseConfig(
@JsonProperty("queue") String queue,
@JsonProperty("exchange") String exchange,
@JsonProperty("routingKey") String routingKey,
@JsonProperty("durable") boolean durable,
@JsonProperty("exclusive") boolean exclusive,
@JsonProperty("autoDelete") boolean autoDelete,
@JsonProperty("maxRetries") int maxRetries,
@JsonProperty("retryIntervalSeconds") int retryIntervalSeconds,
@JsonProperty("maxDurationSeconds") long maxDurationSeconds
)
{
this.queue = queue;
this.exchange = exchange;
this.routingKey = routingKey;
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
this.maxRetries = maxRetries == 0 ? defaultMaxRetries : maxRetries;
this.retryIntervalSeconds = retryIntervalSeconds == 0 ? defaultRetryIntervalSeconds : retryIntervalSeconds;
this.maxDurationSeconds = maxDurationSeconds == 0 ? defaultMaxDurationSeconds : maxDurationSeconds;
}
@JsonProperty
public String getQueue()
@ -44,96 +78,109 @@ public class RabbitMQFirehoseConfig
return queue;
}
public void setQueue(String queue)
{
this.queue = queue;
}
@JsonProperty
public String getExchange()
{
return exchange;
}
public void setExchange(String exchange)
{
this.exchange = exchange;
}
@JsonProperty
public String getRoutingKey()
{
return routingKey;
}
public void setRoutingKey(String routingKey)
{
this.routingKey = routingKey;
}
@JsonProperty
public boolean isDurable()
{
return durable;
}
public void setDurable(boolean durable)
{
this.durable = durable;
}
@JsonProperty
public boolean isExclusive()
{
return exclusive;
}
public void setExclusive(boolean exclusive)
{
this.exclusive = exclusive;
}
@JsonProperty
public boolean isAutoDelete()
{
return autoDelete;
}
public void setAutoDelete(boolean autoDelete)
{
this.autoDelete = autoDelete;
}
@JsonProperty
public int getMaxRetries()
{
return maxRetries;
}
public void setMaxRetries(int maxRetries)
{
this.maxRetries = maxRetries;
}
@JsonProperty
public int getRetryIntervalSeconds()
{
return retryIntervalSeconds;
}
public void setRetryIntervalSeconds(int retryIntervalSeconds)
{
this.retryIntervalSeconds = retryIntervalSeconds;
}
@JsonProperty
public long getMaxDurationSeconds()
{
return maxDurationSeconds;
}
public void setMaxDurationSeconds(int maxDurationSeconds)
@Override
public boolean equals(Object o)
{
this.maxDurationSeconds = maxDurationSeconds;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RabbitMQFirehoseConfig that = (RabbitMQFirehoseConfig) o;
if (autoDelete != that.autoDelete) {
return false;
}
if (durable != that.durable) {
return false;
}
if (exclusive != that.exclusive) {
return false;
}
if (maxDurationSeconds != that.maxDurationSeconds) {
return false;
}
if (maxRetries != that.maxRetries) {
return false;
}
if (retryIntervalSeconds != that.retryIntervalSeconds) {
return false;
}
if (exchange != null ? !exchange.equals(that.exchange) : that.exchange != null) {
return false;
}
if (queue != null ? !queue.equals(that.queue) : that.queue != null) {
return false;
}
if (routingKey != null ? !routingKey.equals(that.routingKey) : that.routingKey != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = queue != null ? queue.hashCode() : 0;
result = 31 * result + (exchange != null ? exchange.hashCode() : 0);
result = 31 * result + (routingKey != null ? routingKey.hashCode() : 0);
result = 31 * result + (durable ? 1 : 0);
result = 31 * result + (exclusive ? 1 : 0);
result = 31 * result + (autoDelete ? 1 : 0);
result = 31 * result + maxRetries;
result = 31 * result + retryIntervalSeconds;
result = 31 * result + (int) (maxDurationSeconds ^ (maxDurationSeconds >>> 32));
return result;
}
}

View File

@ -26,7 +26,6 @@ import com.metamx.common.logger.Logger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
@ -114,10 +113,12 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
@JsonProperty("config") RabbitMQFirehoseConfig config,
@JsonProperty("parser") StringInputRowParser parser
)
) throws Exception
{
this.connectionFactory = connectionFactory;
this.config = config;
this.connectionFactory = connectionFactory == null
? JacksonifiedConnectionFactory.makeDefaultConnectionFactory()
: connectionFactory;
this.config = config == null ? RabbitMQFirehoseConfig.makeDefaultConfig() : config;
this.parser = parser;
}

View File

@ -20,6 +20,7 @@
package io.druid.examples.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@ -44,14 +45,40 @@ public class RabbitMQFirehoseFactoryTest
@Test
public void testSerde() throws Exception
{
RabbitMQFirehoseConfig config = new RabbitMQFirehoseConfig(
"test",
"test2",
"test3",
true,
true,
true,
5,
10,
20
);
JacksonifiedConnectionFactory connectionFactory = new JacksonifiedConnectionFactory(
"foo",
9978,
"user",
"pw",
"host",
null,
5,
10,
11,
12,
ImmutableMap.<String, Object>of("hi", "bye")
);
RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory(
new JacksonifiedConnectionFactory(),
new RabbitMQFirehoseConfig(),
connectionFactory,
config,
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto"),
new DimensionsSpec(
Arrays.<String>asList("dim"),
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
)
@ -65,5 +92,8 @@ public class RabbitMQFirehoseFactoryTest
byte[] bytes2 = mapper.writeValueAsBytes(factory2);
Assert.assertArrayEquals(bytes, bytes2);
Assert.assertEquals(factory.getConfig(), factory2.getConfig());
Assert.assertEquals(factory.getConnectionFactory(), factory2.getConnectionFactory());
}
}