From 807a52f9636a76c7012877b93efccb2d54bf2d5a Mon Sep 17 00:00:00 2001 From: cheddar Date: Thu, 18 Jul 2013 13:16:42 -0700 Subject: [PATCH] Another take on the configuration --- .../JacksonifiedConnectionFactory.java | 152 ++++++++++++++++++ .../firehose/RabbitMQFirehoseConfig.java | 81 ++++++++++ .../firehose/RabbitMQFirehoseFactory.java | 45 ++---- 3 files changed, 250 insertions(+), 28 deletions(-) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseConfig.java diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java new file mode 100644 index 00000000000..fa159a73367 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java @@ -0,0 +1,152 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.rabbitmq.client.ConnectionFactory; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +/** + */ +public class JacksonifiedConnectionFactory extends ConnectionFactory +{ + @Override + @JsonProperty + public String getHost() + { + return super.getHost(); + } + + @Override + public void setHost(String host) + { + super.setHost(host); + } + + @Override + @JsonProperty + public int getPort() + { + return super.getPort(); + } + + @Override + public void setPort(int port) + { + super.setPort(port); + } + + @Override + @JsonProperty + public String getUsername() + { + return super.getUsername(); + } + + @Override + public void setUsername(String username) + { + super.setUsername(username); + } + + @Override + @JsonProperty + public String getPassword() + { + return super.getPassword(); + } + + @Override + public void setPassword(String password) + { + super.setPassword(password); + } + + @Override + @JsonProperty + public String getVirtualHost() + { + return super.getVirtualHost(); + } + + @Override + public void setVirtualHost(String virtualHost) + { + super.setVirtualHost(virtualHost); + } + + @Override + @JsonProperty + public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException + { + super.setUri(uriString); + } + + @Override + @JsonProperty + public int getRequestedChannelMax() + { + return super.getRequestedChannelMax(); + } + + @Override + public void setRequestedChannelMax(int requestedChannelMax) + { + super.setRequestedChannelMax(requestedChannelMax); + } + + @Override + @JsonProperty + public int getRequestedFrameMax() + { + return super.getRequestedFrameMax(); + } + + @Override + public void setRequestedFrameMax(int requestedFrameMax) + { + super.setRequestedFrameMax(requestedFrameMax); + } + + @Override + @JsonProperty + public int getRequestedHeartbeat() + { + return super.getRequestedHeartbeat(); + } + + @Override + public void setConnectionTimeout(int connectionTimeout) + { + super.setConnectionTimeout(connectionTimeout); + } + + @Override + @JsonProperty + public int getConnectionTimeout() + { + return super.getConnectionTimeout(); + } + + @Override + public void setRequestedHeartbeat(int requestedHeartbeat) + { + super.setRequestedHeartbeat(requestedHeartbeat); + } + + @Override + @JsonProperty + public Map getClientProperties() + { + return super.getClientProperties(); + } + + @Override + public void setClientProperties(Map clientProperties) + { + super.setClientProperties(clientProperties); + } +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseConfig.java new file mode 100644 index 00000000000..e927a7f5298 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseConfig.java @@ -0,0 +1,81 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +public class RabbitMQFirehoseConfig +{ + private String queue = null; + private String exchange = null; + private String routingKey = null; + private boolean durable = false; + private boolean exclusive = false; + private boolean autoDelete = false; + + @JsonProperty + public String getQueue() + { + return queue; + } + + public void setQueue(String queue) + { + this.queue = queue; + } + + @JsonProperty + public String getExchange() + { + return exchange; + } + + public void setExchange(String exchange) + { + this.exchange = exchange; + } + + @JsonProperty + public String getRoutingKey() + { + return routingKey; + } + + public void setRoutingKey(String routingKey) + { + this.routingKey = routingKey; + } + + @JsonProperty + public boolean isDurable() + { + return durable; + } + + public void setDurable(boolean durable) + { + this.durable = durable; + } + + @JsonProperty + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + + @JsonProperty + public boolean isAutoDelete() + { + return autoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + this.autoDelete = autoDelete; + } +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java index 36dbad4d106..b5565f2b39a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java @@ -60,51 +60,40 @@ import java.util.Properties; public class RabbitMQFirehoseFactory implements FirehoseFactory { private static final Logger log = new Logger(RabbitMQFirehoseFactory.class); + @JsonProperty - private final Properties consumerProps; + private final RabbitMQFirehoseConfig config; + @JsonProperty private final StringInputRowParser parser; + @JsonProperty + private final ConnectionFactory connectionFactory; + @JsonCreator public RabbitMQFirehoseFactory( - @JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("connection") ConnectionFactory connectionFactory, + @JsonProperty("config") RabbitMQFirehoseConfig config, @JsonProperty("parser") StringInputRowParser parser ) { - this.consumerProps = consumerProps; + this.connectionFactory = connectionFactory; + this.config = config; this.parser = parser; } @Override public Firehose connect() throws IOException { - final ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(consumerProps.getProperty("host", factory.getHost())); - factory.setPort(Integer.parseInt(consumerProps.getProperty("port", Integer.toString(factory.getPort())))); - factory.setUsername(consumerProps.getProperty("username", factory.getUsername())); - factory.setPassword(consumerProps.getProperty("password", factory.getPassword())); - factory.setVirtualHost(consumerProps.getProperty("virtualHost", factory.getVirtualHost())); + String queue = config.getQueue(); + String exchange = config.getExchange(); + String routingKey = config.getRoutingKey(); - // If the URI property has a value it overrides the values set above. - if(consumerProps.containsKey("uri")){ - try { - factory.setUri(consumerProps.getProperty("uri")); - } - catch(Exception e){ - // A little silly to throw an IOException but we'll make do for now with it. - throw new IOException("Bad URI format.", e); - } - } + boolean durable = config.isDurable(); + boolean exclusive = config.isExclusive(); + boolean autoDelete = config.isAutoDelete(); - String queue = consumerProps.getProperty("queue"); - String exchange = consumerProps.getProperty("exchange"); - String routingKey = consumerProps.getProperty("routingKey"); - - boolean durable = Boolean.valueOf(consumerProps.getProperty("durable", "false")); - boolean exclusive = Boolean.valueOf(consumerProps.getProperty("exclusive", "false")); - boolean autoDelete = Boolean.valueOf(consumerProps.getProperty("autoDelete", "false")); - - final Connection connection = factory.newConnection(); + final Connection connection = connectionFactory.newConnection(); connection.addShutdownListener(new ShutdownListener() { @Override