Another take on the configuration

This commit is contained in:
cheddar 2013-07-18 13:16:42 -07:00
parent 217d539e46
commit 807a52f963
3 changed files with 250 additions and 28 deletions

View File

@ -0,0 +1,152 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
/**
*/
public class JacksonifiedConnectionFactory extends ConnectionFactory
{
@Override
@JsonProperty
public String getHost()
{
return super.getHost();
}
@Override
public void setHost(String host)
{
super.setHost(host);
}
@Override
@JsonProperty
public int getPort()
{
return super.getPort();
}
@Override
public void setPort(int port)
{
super.setPort(port);
}
@Override
@JsonProperty
public String getUsername()
{
return super.getUsername();
}
@Override
public void setUsername(String username)
{
super.setUsername(username);
}
@Override
@JsonProperty
public String getPassword()
{
return super.getPassword();
}
@Override
public void setPassword(String password)
{
super.setPassword(password);
}
@Override
@JsonProperty
public String getVirtualHost()
{
return super.getVirtualHost();
}
@Override
public void setVirtualHost(String virtualHost)
{
super.setVirtualHost(virtualHost);
}
@Override
@JsonProperty
public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
{
super.setUri(uriString);
}
@Override
@JsonProperty
public int getRequestedChannelMax()
{
return super.getRequestedChannelMax();
}
@Override
public void setRequestedChannelMax(int requestedChannelMax)
{
super.setRequestedChannelMax(requestedChannelMax);
}
@Override
@JsonProperty
public int getRequestedFrameMax()
{
return super.getRequestedFrameMax();
}
@Override
public void setRequestedFrameMax(int requestedFrameMax)
{
super.setRequestedFrameMax(requestedFrameMax);
}
@Override
@JsonProperty
public int getRequestedHeartbeat()
{
return super.getRequestedHeartbeat();
}
@Override
public void setConnectionTimeout(int connectionTimeout)
{
super.setConnectionTimeout(connectionTimeout);
}
@Override
@JsonProperty
public int getConnectionTimeout()
{
return super.getConnectionTimeout();
}
@Override
public void setRequestedHeartbeat(int requestedHeartbeat)
{
super.setRequestedHeartbeat(requestedHeartbeat);
}
@Override
@JsonProperty
public Map<String, Object> getClientProperties()
{
return super.getClientProperties();
}
@Override
public void setClientProperties(Map<String, Object> clientProperties)
{
super.setClientProperties(clientProperties);
}
}

View File

@ -0,0 +1,81 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class RabbitMQFirehoseConfig
{
private String queue = null;
private String exchange = null;
private String routingKey = null;
private boolean durable = false;
private boolean exclusive = false;
private boolean autoDelete = false;
@JsonProperty
public String getQueue()
{
return queue;
}
public void setQueue(String queue)
{
this.queue = queue;
}
@JsonProperty
public String getExchange()
{
return exchange;
}
public void setExchange(String exchange)
{
this.exchange = exchange;
}
@JsonProperty
public String getRoutingKey()
{
return routingKey;
}
public void setRoutingKey(String routingKey)
{
this.routingKey = routingKey;
}
@JsonProperty
public boolean isDurable()
{
return durable;
}
public void setDurable(boolean durable)
{
this.durable = durable;
}
@JsonProperty
public boolean isExclusive()
{
return exclusive;
}
public void setExclusive(boolean exclusive)
{
this.exclusive = exclusive;
}
@JsonProperty
public boolean isAutoDelete()
{
return autoDelete;
}
public void setAutoDelete(boolean autoDelete)
{
this.autoDelete = autoDelete;
}
}

View File

@ -60,51 +60,40 @@ import java.util.Properties;
public class RabbitMQFirehoseFactory implements FirehoseFactory public class RabbitMQFirehoseFactory implements FirehoseFactory
{ {
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class); private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@JsonProperty @JsonProperty
private final Properties consumerProps; private final RabbitMQFirehoseConfig config;
@JsonProperty @JsonProperty
private final StringInputRowParser parser; private final StringInputRowParser parser;
@JsonProperty
private final ConnectionFactory connectionFactory;
@JsonCreator @JsonCreator
public RabbitMQFirehoseFactory( public RabbitMQFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps, @JsonProperty("connection") ConnectionFactory connectionFactory,
@JsonProperty("config") RabbitMQFirehoseConfig config,
@JsonProperty("parser") StringInputRowParser parser @JsonProperty("parser") StringInputRowParser parser
) )
{ {
this.consumerProps = consumerProps; this.connectionFactory = connectionFactory;
this.config = config;
this.parser = parser; this.parser = parser;
} }
@Override @Override
public Firehose connect() throws IOException public Firehose connect() throws IOException
{ {
final ConnectionFactory factory = new ConnectionFactory(); String queue = config.getQueue();
factory.setHost(consumerProps.getProperty("host", factory.getHost())); String exchange = config.getExchange();
factory.setPort(Integer.parseInt(consumerProps.getProperty("port", Integer.toString(factory.getPort())))); String routingKey = config.getRoutingKey();
factory.setUsername(consumerProps.getProperty("username", factory.getUsername()));
factory.setPassword(consumerProps.getProperty("password", factory.getPassword()));
factory.setVirtualHost(consumerProps.getProperty("virtualHost", factory.getVirtualHost()));
// If the URI property has a value it overrides the values set above. boolean durable = config.isDurable();
if(consumerProps.containsKey("uri")){ boolean exclusive = config.isExclusive();
try { boolean autoDelete = config.isAutoDelete();
factory.setUri(consumerProps.getProperty("uri"));
}
catch(Exception e){
// A little silly to throw an IOException but we'll make do for now with it.
throw new IOException("Bad URI format.", e);
}
}
String queue = consumerProps.getProperty("queue"); final Connection connection = connectionFactory.newConnection();
String exchange = consumerProps.getProperty("exchange");
String routingKey = consumerProps.getProperty("routingKey");
boolean durable = Boolean.valueOf(consumerProps.getProperty("durable", "false"));
boolean exclusive = Boolean.valueOf(consumerProps.getProperty("exclusive", "false"));
boolean autoDelete = Boolean.valueOf(consumerProps.getProperty("autoDelete", "false"));
final Connection connection = factory.newConnection();
connection.addShutdownListener(new ShutdownListener() connection.addShutdownListener(new ShutdownListener()
{ {
@Override @Override