mirror of https://github.com/apache/druid.git
Merge branch 'master' into fix-topn
This commit is contained in:
commit
b27d9af0ef
|
@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Job;
|
|||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -91,7 +92,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
);
|
||||
|
||||
JobHelper.injectSystemProperties(groupByJob);
|
||||
groupByJob.setInputFormatClass(TextInputFormat.class);
|
||||
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
|
||||
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
|
||||
groupByJob.setMapOutputKeyClass(LongWritable.class);
|
||||
groupByJob.setMapOutputValueClass(BytesWritable.class);
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.JobContext;
|
|||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -146,7 +147,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
|
||||
JobHelper.injectSystemProperties(job);
|
||||
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setInputFormatClass(CombineTextInputFormat.class);
|
||||
|
||||
job.setMapperClass(IndexGeneratorMapper.class);
|
||||
job.setMapOutputValueClass(Text.class);
|
||||
|
|
11
pom.xml
11
pom.xml
|
@ -319,17 +319,17 @@
|
|||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
<version>9.1.4.v20140401</version>
|
||||
<version>9.1.5.v20140505</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlet</artifactId>
|
||||
<version>9.1.4.v20140401</version>
|
||||
<version>9.1.5.v20140505</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlets</artifactId>
|
||||
<version>9.1.4.v20140401</version>
|
||||
<version>9.1.5.v20140505</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
|
@ -406,11 +406,6 @@
|
|||
<artifactId>httpcore</artifactId>
|
||||
<version>4.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.davekoelle</groupId>
|
||||
<artifactId>alphanum</artifactId>
|
||||
<version>1.0.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
|
|
|
@ -25,9 +25,13 @@ import com.metamx.common.logger.Logger;
|
|||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import com.rabbitmq.client.DefaultConsumer;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.QueueingConsumer.Delivery;
|
||||
import com.rabbitmq.client.ShutdownListener;
|
||||
import com.rabbitmq.client.ShutdownSignalException;
|
||||
import com.rabbitmq.client.ConsumerCancelledException;
|
||||
import io.druid.data.input.ByteBufferInputRowParser;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.FirehoseFactory;
|
||||
|
@ -40,6 +44,8 @@ import net.jodah.lyra.retry.RetryPolicy;
|
|||
import net.jodah.lyra.util.Duration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
* A FirehoseFactory for RabbitMQ.
|
||||
|
@ -179,7 +185,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
|||
* Storing the latest delivery as a member variable should be safe since this will only be run
|
||||
* by a single thread.
|
||||
*/
|
||||
private QueueingConsumer.Delivery delivery;
|
||||
private Delivery delivery;
|
||||
|
||||
/**
|
||||
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
|
||||
|
@ -268,4 +274,41 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
|||
{
|
||||
return parser;
|
||||
}
|
||||
|
||||
private static class QueueingConsumer extends DefaultConsumer
|
||||
{
|
||||
private final BlockingQueue<Delivery> _queue;
|
||||
|
||||
public QueueingConsumer(Channel ch) {
|
||||
this(ch, new LinkedBlockingQueue<Delivery>());
|
||||
}
|
||||
|
||||
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
|
||||
super(ch);
|
||||
this._queue = q;
|
||||
}
|
||||
|
||||
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
|
||||
_queue.clear();
|
||||
}
|
||||
|
||||
@Override public void handleCancel(String consumerTag) throws IOException {
|
||||
_queue.clear();
|
||||
}
|
||||
|
||||
@Override public void handleDelivery(String consumerTag,
|
||||
Envelope envelope,
|
||||
AMQP.BasicProperties properties,
|
||||
byte[] body)
|
||||
throws IOException
|
||||
{
|
||||
this._queue.add(new Delivery(envelope, properties, body));
|
||||
}
|
||||
|
||||
public Delivery nextDelivery()
|
||||
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
|
||||
{
|
||||
return _queue.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue