mirror of https://github.com/apache/druid.git
Merge branch 'master' into moar-endpoints
This commit is contained in:
commit
215584891e
2
build.sh
2
build.sh
|
@ -30,4 +30,4 @@ echo "For examples, see: "
|
|||
echo " "
|
||||
ls -1 examples/*/*sh
|
||||
echo " "
|
||||
echo "See also http://druid.io/docs/0.6.51"
|
||||
echo "See also http://druid.io/docs/0.6.52"
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -21,10 +21,15 @@ package io.druid.concurrent;
|
|||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -49,4 +54,31 @@ public class Execs
|
|||
{
|
||||
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param nameFormat nameformat for threadFactory
|
||||
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
|
||||
* @return ExecutorService which blocks accepting new tasks when the capacity reached
|
||||
*/
|
||||
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
|
||||
{
|
||||
return new ThreadPoolExecutor(
|
||||
1, 1,
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
|
||||
, new RejectedExecutionHandler()
|
||||
{
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
|
||||
{
|
||||
try {
|
||||
((ArrayBlockingQueue) executor.getQueue()).put(r);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.concurrent;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ExecsTest
|
||||
{
|
||||
@Test
|
||||
public void testBlockingExecutorService() throws Exception
|
||||
{
|
||||
final int capacity = 3;
|
||||
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
|
||||
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
|
||||
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
|
||||
final CountDownLatch taskStartSignal = new CountDownLatch(1);
|
||||
final AtomicInteger producedCount = new AtomicInteger();
|
||||
final AtomicInteger consumedCount = new AtomicInteger();
|
||||
ExecutorService producer = Executors.newSingleThreadExecutor();
|
||||
producer.submit(
|
||||
new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
for (int i = 0; i < 2 * capacity; i++) {
|
||||
final int taskID = i;
|
||||
System.out.println("Produced task" + taskID);
|
||||
blockingExecutor.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
System.out.println("Starting task" + taskID);
|
||||
try {
|
||||
taskStartSignal.await();
|
||||
consumedCount.incrementAndGet();
|
||||
taskCompletedSignal.countDown();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
System.out.println("Completed task" + taskID);
|
||||
}
|
||||
}
|
||||
);
|
||||
producedCount.incrementAndGet();
|
||||
queueFullSignal.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
queueFullSignal.await();
|
||||
// verify that the producer blocks
|
||||
Assert.assertEquals(capacity + 1, producedCount.get());
|
||||
// let the tasks run
|
||||
taskStartSignal.countDown();
|
||||
// wait until all tasks complete
|
||||
taskCompletedSignal.await();
|
||||
// verify all tasks consumed
|
||||
Assert.assertEquals(2 * capacity, consumedCount.get());
|
||||
// cleanup
|
||||
blockingExecutor.shutdown();
|
||||
producer.shutdown();
|
||||
|
||||
}
|
||||
}
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.51
|
||||
git checkout druid-0.6.52
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.51"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.52"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.51
|
||||
cd druid-services-0.6.52
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -45,7 +45,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
|
|||
<a id="set-up-kafka"></a>
|
||||
#### Setting up Kafka
|
||||
|
||||
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.51/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
|
||||
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.52/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
|
||||
|
||||
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.51"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.51","io.druid.extensions:druid-kafka-seven:0.6.51"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.51
|
||||
cd druid-services-0.6.52
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz.
|
||||
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz.
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -4,7 +4,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.51"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
|
|
@ -4,7 +4,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.51","io.druid.extensions:druid-kafka-seven:0.6.51","io.druid.extensions:druid-rabbitmq:0.6.51"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52","io.druid.extensions:druid-rabbitmq:0.6.52"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,11 +19,9 @@
|
|||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.db.DbConnector;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
|
@ -39,8 +37,6 @@ public class DbUpdaterJob implements Jobby
|
|||
{
|
||||
private static final Logger log = new Logger(DbUpdaterJob.class);
|
||||
|
||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private final IDBI dbi;
|
||||
|
||||
|
@ -82,7 +78,7 @@ public class DbUpdaterJob implements Jobby
|
|||
.put("partitioned", segment.getShardSpec().getPartitionNum())
|
||||
.put("version", segment.getVersion())
|
||||
.put("used", true)
|
||||
.put("payload", jsonMapper.writeValueAsString(segment))
|
||||
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
|
||||
.build()
|
||||
);
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -194,7 +194,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
|
||||
windowPeriod,
|
||||
new File(toolbox.getTaskWorkDir(), "persist"),
|
||||
segmentGranularity
|
||||
segmentGranularity, fireDepartmentConfig.getMaxPendingPersists()
|
||||
);
|
||||
|
||||
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,17 +28,23 @@ import org.joda.time.Period;
|
|||
*/
|
||||
public class FireDepartmentConfig
|
||||
{
|
||||
private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2;
|
||||
private final int maxRowsInMemory;
|
||||
private final Period intermediatePersistPeriod;
|
||||
private final int maxPendingPersists;
|
||||
|
||||
@JsonCreator
|
||||
public FireDepartmentConfig(
|
||||
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("maxPendingPersists") int maxPendingPersists
|
||||
)
|
||||
{
|
||||
this.maxRowsInMemory = maxRowsInMemory;
|
||||
this.intermediatePersistPeriod = intermediatePersistPeriod;
|
||||
this.maxPendingPersists = maxPendingPersists > 0
|
||||
? maxPendingPersists
|
||||
: MAX_PENDING_PERSIST_BATCHES_DEFAULT;
|
||||
|
||||
Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory);
|
||||
Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod");
|
||||
|
@ -55,4 +61,10 @@ public class FireDepartmentConfig
|
|||
{
|
||||
return intermediatePersistPeriod;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMaxPendingPersists()
|
||||
{
|
||||
return maxPendingPersists;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
|
@ -42,6 +41,7 @@ import io.druid.client.DruidServer;
|
|||
import io.druid.client.ServerView;
|
||||
import io.druid.common.guava.ThreadRenamingCallable;
|
||||
import io.druid.common.guava.ThreadRenamingRunnable;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.query.MetricsEmittingQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
|
@ -84,7 +84,6 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -92,43 +91,34 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
public class RealtimePlumberSchool implements PlumberSchool
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
|
||||
|
||||
private final Period windowPeriod;
|
||||
private final File basePersistDirectory;
|
||||
private final IndexGranularity segmentGranularity;
|
||||
private final Object handoffCondition = new Object();
|
||||
|
||||
private final int maxPendingPersists;
|
||||
private volatile boolean shuttingDown = false;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile ServiceEmitter emitter;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile DataSegmentPusher dataSegmentPusher = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile SegmentPublisher segmentPublisher = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile ServerView serverView = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
@Processing
|
||||
private volatile ExecutorService queryExecutorService = null;
|
||||
|
||||
private volatile VersioningPolicy versioningPolicy = null;
|
||||
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
|
||||
|
||||
|
@ -136,7 +126,8 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
public RealtimePlumberSchool(
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
|
||||
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
|
||||
@JsonProperty("maxPendingPersists") int maxPendingPersists
|
||||
)
|
||||
{
|
||||
this.windowPeriod = windowPeriod;
|
||||
|
@ -144,7 +135,9 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
this.segmentGranularity = segmentGranularity;
|
||||
this.versioningPolicy = new IntervalStartVersioningPolicy();
|
||||
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
|
||||
this.maxPendingPersists = maxPendingPersists;
|
||||
|
||||
Preconditions.checkArgument(maxPendingPersists > 0);
|
||||
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
|
||||
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
|
||||
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
|
||||
|
@ -485,22 +478,13 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
private void initializeExecutors()
|
||||
{
|
||||
if (persistExecutor == null) {
|
||||
persistExecutor = Executors.newFixedThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("plumber_persist_%d")
|
||||
.build()
|
||||
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
|
||||
persistExecutor = Execs.newBlockingSingleThreaded(
|
||||
"plumber_persist_%d", maxPendingPersists
|
||||
);
|
||||
}
|
||||
if (scheduledExecutor == null) {
|
||||
scheduledExecutor = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("plumber_scheduled_%d")
|
||||
.build()
|
||||
);
|
||||
scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -697,7 +681,8 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
/**
|
||||
* Unannounces a given sink and removes all local references to it.
|
||||
*/
|
||||
private void abandonSegment(final long truncatedTime, final Sink sink) {
|
||||
private void abandonSegment(final long truncatedTime, final Sink sink)
|
||||
{
|
||||
try {
|
||||
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
||||
|
|
|
@ -78,7 +78,7 @@ public class RealtimeManagerTest
|
|||
Arrays.<FireDepartment>asList(
|
||||
new FireDepartment(
|
||||
schema,
|
||||
new FireDepartmentConfig(1, new Period("P1Y")),
|
||||
new FireDepartmentConfig(1, new Period("P1Y"), 1),
|
||||
new FirehoseFactory()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest
|
|||
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
|
||||
new Period("PT10m"),
|
||||
tmpDir,
|
||||
IndexGranularity.HOUR
|
||||
IndexGranularity.HOUR, 1
|
||||
);
|
||||
|
||||
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.52-SNAPSHOT</version>
|
||||
<version>0.6.53-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -54,7 +54,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "broker",
|
||||
description = "Runs a broker node, see http://druid.io/docs/0.6.51/Broker.html for a description"
|
||||
description = "Runs a broker node, see http://druid.io/docs/0.6.52/Broker.html for a description"
|
||||
)
|
||||
public class CliBroker extends ServerRunnable
|
||||
{
|
||||
|
|
|
@ -65,7 +65,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "coordinator",
|
||||
description = "Runs the Coordinator, see http://druid.io/docs/0.6.51/Coordinator.html for a description."
|
||||
description = "Runs the Coordinator, see http://druid.io/docs/0.6.52/Coordinator.html for a description."
|
||||
)
|
||||
public class CliCoordinator extends ServerRunnable
|
||||
{
|
||||
|
|
|
@ -41,7 +41,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "hadoop",
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.51/Batch-ingestion.html for a description."
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.52/Batch-ingestion.html for a description."
|
||||
)
|
||||
public class CliHadoopIndexer implements Runnable
|
||||
{
|
||||
|
|
|
@ -42,7 +42,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "historical",
|
||||
description = "Runs a Historical node, see http://druid.io/docs/0.6.51/Historical.html for a description"
|
||||
description = "Runs a Historical node, see http://druid.io/docs/0.6.52/Historical.html for a description"
|
||||
)
|
||||
public class CliHistorical extends ServerRunnable
|
||||
{
|
||||
|
|
|
@ -93,7 +93,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "overlord",
|
||||
description = "Runs an Overlord node, see http://druid.io/docs/0.6.51/Indexing-Service.html for a description"
|
||||
description = "Runs an Overlord node, see http://druid.io/docs/0.6.52/Indexing-Service.html for a description"
|
||||
)
|
||||
public class CliOverlord extends ServerRunnable
|
||||
{
|
||||
|
|
|
@ -30,7 +30,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "realtime",
|
||||
description = "Runs a realtime node, see http://druid.io/docs/0.6.51/Realtime.html for a description"
|
||||
description = "Runs a realtime node, see http://druid.io/docs/0.6.52/Realtime.html for a description"
|
||||
)
|
||||
public class CliRealtime extends ServerRunnable
|
||||
{
|
||||
|
|
|
@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
|
|||
*/
|
||||
@Command(
|
||||
name = "realtime",
|
||||
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.51/Realtime.html for a description"
|
||||
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.52/Realtime.html for a description"
|
||||
)
|
||||
public class CliRealtimeExample extends ServerRunnable
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue