fix rabbit serde and add tests; fixes #794

This commit is contained in:
fjy 2014-10-17 11:31:18 -04:00
parent 8ec8952993
commit 3d09edaae6
6 changed files with 139 additions and 29 deletions

View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId> <groupId>io.druid.extensions</groupId>
<artifactId>druid-rabbitmq</artifactId> <artifactId>druid-rabbitmq</artifactId>
@ -17,6 +18,11 @@
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid-api</artifactId> <artifactId>druid-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.rabbitmq</groupId> <groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId> <artifactId>amqp-client</artifactId>

View File

@ -20,8 +20,10 @@
package io.druid.firehose.rabbitmq; package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import javax.annotation.Nullable;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.KeyManagementException; import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
@ -161,7 +163,17 @@ public class JacksonifiedConnectionFactory extends ConnectionFactory
@JsonProperty @JsonProperty
public Map<String, Object> getClientProperties() public Map<String, Object> getClientProperties()
{ {
return super.getClientProperties(); return Maps.transformEntries(
super.getClientProperties(),
new Maps.EntryTransformer<String, Object, Object>()
{
@Override
public Object transformEntry(String key, Object value)
{
return value.toString();
}
}
);
} }
@Override @Override

View File

@ -105,29 +105,35 @@ public class RabbitMQFirehoseConfig
} }
@JsonProperty @JsonProperty
public int getMaxRetries() { public int getMaxRetries()
{
return maxRetries; return maxRetries;
} }
public void setMaxRetries(int maxRetries) { public void setMaxRetries(int maxRetries)
{
this.maxRetries = maxRetries; this.maxRetries = maxRetries;
} }
@JsonProperty @JsonProperty
public int getRetryIntervalSeconds() { public int getRetryIntervalSeconds()
{
return retryIntervalSeconds; return retryIntervalSeconds;
} }
public void setRetryIntervalSeconds(int retryIntervalSeconds) { public void setRetryIntervalSeconds(int retryIntervalSeconds)
{
this.retryIntervalSeconds = retryIntervalSeconds; this.retryIntervalSeconds = retryIntervalSeconds;
} }
@JsonProperty @JsonProperty
public long getMaxDurationSeconds() { public long getMaxDurationSeconds()
{
return maxDurationSeconds; return maxDurationSeconds;
} }
public void setMaxDurationSeconds(int maxDurationSeconds) { public void setMaxDurationSeconds(int maxDurationSeconds)
{
this.maxDurationSeconds = maxDurationSeconds; this.maxDurationSeconds = maxDurationSeconds;
} }
} }

View File

@ -50,14 +50,14 @@ import java.util.concurrent.LinkedBlockingQueue;
/** /**
* A FirehoseFactory for RabbitMQ. * A FirehoseFactory for RabbitMQ.
* * <p/>
* It will receive it's configuration through the realtime.spec file and expects to find a * It will receive it's configuration through the realtime.spec file and expects to find a
* consumerProps element in the firehose definition with values for a number of configuration options. * consumerProps element in the firehose definition with values for a number of configuration options.
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options * Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
* that have defaults can be skipped but options with no defaults must be specified with the exception * that have defaults can be skipped but options with no defaults must be specified with the exception
* of the URI property. If the URI property is set, it will override any other property that was also * of the URI property. If the URI property is set, it will override any other property that was also
* set. * set.
* * <p/>
* File: <em>realtime.spec</em> * File: <em>realtime.spec</em>
* <pre> * <pre>
* "firehose" : { * "firehose" : {
@ -89,7 +89,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* } * }
* }, * },
* </pre> * </pre>
* * <p/>
* <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the * <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the
* connection to it is lost. Furthermore it does not support any automatic failover on high availability * connection to it is lost. Furthermore it does not support any automatic failover on high availability
* RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior * RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior
@ -97,7 +97,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this * the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this
* Firehose connects to, messages should survive an MQ broker node failure and be delivered once a * Firehose connects to, messages should survive an MQ broker node failure and be delivered once a
* connection to another node is set up. * connection to another node is set up.
* * <p/>
* For more information on RabbitMQ high availability please see: * For more information on RabbitMQ high availability please see:
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>. * <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
*/ */
@ -105,14 +105,9 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
{ {
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class); private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@JsonProperty
private final RabbitMQFirehoseConfig config; private final RabbitMQFirehoseConfig config;
@JsonProperty
private final StringInputRowParser parser; private final StringInputRowParser parser;
private final JacksonifiedConnectionFactory connectionFactory;
@JsonProperty
private final ConnectionFactory connectionFactory;
@JsonCreator @JsonCreator
public RabbitMQFirehoseFactory( public RabbitMQFirehoseFactory(
@ -126,6 +121,18 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
this.parser = parser; this.parser = parser;
} }
@JsonProperty
public RabbitMQFirehoseConfig getConfig()
{
return config;
}
@JsonProperty
public JacksonifiedConnectionFactory getConnectionFactory()
{
return connectionFactory;
}
@Override @Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{ {
@ -270,6 +277,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
}; };
} }
@JsonProperty
@Override @Override
public ByteBufferInputRowParser getParser() public ByteBufferInputRowParser getParser()
{ {
@ -280,27 +288,36 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
{ {
private final BlockingQueue<Delivery> _queue; private final BlockingQueue<Delivery> _queue;
public QueueingConsumer(Channel ch) { public QueueingConsumer(Channel ch)
{
this(ch, new LinkedBlockingQueue<Delivery>()); this(ch, new LinkedBlockingQueue<Delivery>());
} }
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) { public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q)
{
super(ch); super(ch);
this._queue = q; this._queue = q;
} }
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { @Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig)
{
_queue.clear(); _queue.clear();
} }
@Override public void handleCancel(String consumerTag) throws IOException { @Override
public void handleCancel(String consumerTag) throws IOException
{
_queue.clear(); _queue.clear();
} }
@Override public void handleDelivery(String consumerTag, @Override
public void handleDelivery(
String consumerTag,
Envelope envelope, Envelope envelope,
AMQP.BasicProperties properties, AMQP.BasicProperties properties,
byte[] body) byte[] body
)
throws IOException throws IOException
{ {
this._queue.add(new Delivery(envelope, properties, body)); this._queue.add(new Delivery(envelope, properties, body));

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.firehose.rabbitmq.JacksonifiedConnectionFactory;
import io.druid.firehose.rabbitmq.RabbitMQFirehoseConfig;
import io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
/**
*/
public class RabbitMQFirehoseFactoryTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
@Test
public void testSerde() throws Exception
{
RabbitMQFirehoseFactory factory = new RabbitMQFirehoseFactory(
new JacksonifiedConnectionFactory(),
new RabbitMQFirehoseConfig(),
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto"),
new DimensionsSpec(
Arrays.<String>asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
)
),
null, null, null, null
)
);
byte[] bytes = mapper.writeValueAsBytes(factory);
RabbitMQFirehoseFactory factory2 = mapper.readValue(bytes, RabbitMQFirehoseFactory.class);
byte[] bytes2 = mapper.writeValueAsBytes(factory2);
Assert.assertArrayEquals(bytes, bytes2);
}
}