mirror of https://github.com/apache/druid.git
fix merge conflicts
This commit is contained in:
commit
358ff915bb
|
@ -75,7 +75,7 @@ public class OrderedMergeIterator<T> implements Iterator<T>
|
|||
new Predicate<Iterator<T>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable Iterator<T> input)
|
||||
public boolean apply(Iterator<T> input)
|
||||
{
|
||||
return input.hasNext();
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ public class OrderedMergeIterator<T> implements Iterator<T>
|
|||
new Function<Iterator<T>, PeekingIterator<T>>()
|
||||
{
|
||||
@Override
|
||||
public PeekingIterator<T> apply(@Nullable Iterator<T> input)
|
||||
public PeekingIterator<T> apply(Iterator<T> input)
|
||||
{
|
||||
return Iterators.peekingIterator(input);
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class OrderedMergeSequence<T> implements Sequence<T>
|
|||
new Function<Yielder<T>, T>()
|
||||
{
|
||||
@Override
|
||||
public T apply(@Nullable Yielder<T> input)
|
||||
public T apply(Yielder<T> input)
|
||||
{
|
||||
return input.get();
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ public class JodaUtils
|
|||
intervals, new Predicate<Interval>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable Interval input)
|
||||
public boolean apply(Interval input)
|
||||
{
|
||||
return input.overlaps(i);
|
||||
}
|
||||
|
|
|
@ -3,3 +3,12 @@ layout: doc_page
|
|||
---
|
||||
# About Experimental Features
|
||||
Experimental features are features we have developed but have not fully tested in a production environment. If you choose to try them out, there will likely be edge cases that we have not covered. We would love feedback on any of these features, whether they are bug reports, suggestions for improvement, or letting us know they work as intended.
|
||||
|
||||
|
||||
To enable experimental features, include their artifacts in the configuration runtime.properties file. Eg-
|
||||
|
||||
```
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-histogram:{VERSION}"]
|
||||
```
|
||||
|
||||
The configuration for all the indexer and query nodes need to be updated with this.
|
||||
|
|
|
@ -155,8 +155,10 @@ Druid storage nodes maintain information about segments they have already downlo
|
|||
|--------|-----------|-------|
|
||||
|`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) |
|
||||
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|
||||
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|5 minutes|
|
||||
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)|
|
||||
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|
||||
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|
||||
|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1|
|
||||
|
||||
### Jetty Server Module
|
||||
|
||||
|
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.152
|
||||
git checkout druid-0.6.156
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.152-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -50,9 +50,7 @@ A sample ingest firehose spec is shown below -
|
|||
{
|
||||
"type" : "ingestSegment",
|
||||
"dataSource" : "wikipedia",
|
||||
"interval" : "2013-01-01/2013-01-02",
|
||||
"dimensions":[],
|
||||
"metrics":[]
|
||||
"interval" : "2013-01-01/2013-01-02"
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -61,8 +59,8 @@ A sample ingest firehose spec is shown below -
|
|||
|type|ingestSegment. Type of firehose|yes|
|
||||
|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
|
||||
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|
||||
|dimensions|The list of dimensions to select. If left empty, all dimensions are selected.|no|
|
||||
|metrics|The list of metrics to select. If left empty, all metrics are returned.|no|
|
||||
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|
||||
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|
||||
|filter| See [Filters](Filters.html)|yes|
|
||||
|
||||
|
||||
|
|
|
@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
|
|||
|
||||
- Update realtime node's configs for Kafka 8 extensions
|
||||
- e.g.
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.152",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.156",...]`
|
||||
- becomes
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.152",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.156",...]`
|
||||
- Update realtime task config for changed keys
|
||||
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/overlord
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/middlemanager
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.152","io.druid.extensions:druid-kafka-seven:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156","io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/historical
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.152","io.druid.extensions:druid-kafka-seven:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156","io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -46,6 +46,12 @@ The format of the result is:
|
|||
} ]
|
||||
```
|
||||
|
||||
Dimension columns will have type `STRING`.
|
||||
Metric columns will have type `FLOAT`.
|
||||
Timestamp column will have type `LONG`.
|
||||
|
||||
Only columns which are dimensions (ie, have type `STRING`) will have any cardinality. Rest of the columns (timestamp and metric columns) will show cardinality as `null`.
|
||||
|
||||
### toInclude
|
||||
|
||||
There are 3 types of toInclude objects.
|
||||
|
@ -72,4 +78,4 @@ The grammar is as follows:
|
|||
|
||||
``` json
|
||||
"toInclude": { "type": "list", "columns": [<string list of column names>]}
|
||||
```
|
||||
```
|
||||
|
|
|
@ -25,6 +25,7 @@ There are several main parts to a select query:
|
|||
|queryType|This String should always be "select"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|
||||
|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
|
||||
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|
||||
|filter|See [Filters](Filters.html)|no|
|
||||
|dimensions|The list of dimensions to select. If left empty, all dimensions are returned.|no|
|
||||
|metrics|The list of metrics to select. If left empty, all metrics are returned.|no|
|
||||
|pagingSpec|A JSON object indicating offsets into different scanned segments. Select query results will return a pagingSpec that can be reused for pagination.|yes|
|
||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.152"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.152-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.152
|
||||
cd druid-services-0.6.156
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -91,7 +91,7 @@ druid.service=overlord
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||
|
||||
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.db.connector.user=druid
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.152-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.152","io.druid.extensions:druid-kafka-seven:0.6.152"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.156","io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.152-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.152
|
||||
cd druid-services-0.6.156
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
# Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.152-bin.tar.gz).
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz).
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.examples.web;
|
||||
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -55,6 +56,6 @@ public class WebJsonSupplier implements InputSupplier<BufferedReader>
|
|||
{
|
||||
URLConnection connection = url.openConnection();
|
||||
connection.setDoInput(true);
|
||||
return new BufferedReader(new InputStreamReader(url.openStream()));
|
||||
return new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class Bucket
|
||||
{
|
||||
public static int PREAMBLE_BYTES = 16;
|
||||
public static final int PREAMBLE_BYTES = 16;
|
||||
|
||||
/** ID for this bucket, unique for this indexer run. Used for grouping and partitioning. */
|
||||
private final int shardNum;
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.List;
|
|||
|
||||
public class TaskConfig
|
||||
{
|
||||
public static List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
|
||||
public static final List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
|
||||
"org.apache.hadoop:hadoop-client:2.3.0"
|
||||
);
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ public class AppendTask extends MergeTaskBase
|
|||
new Predicate<Rowboat>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable Rowboat input)
|
||||
public boolean apply(Rowboat input)
|
||||
{
|
||||
return holder.getInterval().contains(input.getTimestamp());
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ public class AppendTask extends MergeTaskBase
|
|||
return "append";
|
||||
}
|
||||
|
||||
private class SegmentToMergeHolder
|
||||
private static class SegmentToMergeHolder
|
||||
{
|
||||
private final DataSegment segment;
|
||||
private final Interval interval;
|
||||
|
|
|
@ -133,7 +133,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
|
|||
new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DataSegment input)
|
||||
public String apply(DataSegment input)
|
||||
{
|
||||
return input.getIdentifier();
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
|
|||
} else {
|
||||
Set<String> metricsSet = new HashSet<>();
|
||||
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
|
||||
metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
|
||||
metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getMetrics());
|
||||
}
|
||||
metricsList = Lists.newArrayList(metricsSet);
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
|
|||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Sequence<InputRow> apply(@Nullable StorageAdapter adapter)
|
||||
public Sequence<InputRow> apply(StorageAdapter adapter)
|
||||
{
|
||||
return Sequences.concat(
|
||||
Sequences.map(
|
||||
|
@ -248,20 +248,25 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
|
|||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Sequence<InputRow> apply(@Nullable final Cursor cursor)
|
||||
public Sequence<InputRow> apply(final Cursor cursor)
|
||||
{
|
||||
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
|
||||
|
||||
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
|
||||
for (String dim : dims) {
|
||||
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
|
||||
dimSelectors.put(dim, dimSelector);
|
||||
// dimSelector is null if the dimension is not present
|
||||
if (dimSelector != null) {
|
||||
dimSelectors.put(dim, dimSelector);
|
||||
}
|
||||
}
|
||||
|
||||
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
|
||||
for (String metric : metrics) {
|
||||
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
|
||||
metSelectors.put(metric, metricSelector);
|
||||
if (metricSelector != null) {
|
||||
metSelectors.put(metric, metricSelector);
|
||||
}
|
||||
}
|
||||
|
||||
return Sequences.simple(
|
||||
|
|
|
@ -799,7 +799,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
ZkWorker zkWorker, ZkWorker zkWorker2
|
||||
)
|
||||
{
|
||||
int retVal = -Ints.compare(zkWorker.getCurrCapacityUsed(), zkWorker2.getCurrCapacityUsed());
|
||||
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
|
||||
if (retVal == 0) {
|
||||
retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost());
|
||||
}
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -39,7 +39,7 @@
|
|||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.26.6</metamx.java-util.version>
|
||||
<metamx.java-util.version>0.26.7</metamx.java-util.version>
|
||||
<apache.curator.version>2.6.0</apache.curator.version>
|
||||
<druid.api.version>0.2.10</druid.api.version>
|
||||
</properties>
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.granularity;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import org.joda.time.Chronology;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
@ -325,7 +326,7 @@ public class PeriodGranularity extends BaseQueryGranularity
|
|||
@Override
|
||||
public byte[] cacheKey()
|
||||
{
|
||||
return (period.toString() + ":" + chronology.getZone().toString()).getBytes();
|
||||
return (period.toString() + ":" + chronology.getZone().toString()).getBytes(Charsets.UTF_8);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,7 +36,7 @@ import java.util.Map;
|
|||
*/
|
||||
public abstract class BaseQuery<T> implements Query<T>
|
||||
{
|
||||
public static String QUERYID = "queryId";
|
||||
public static final String QUERYID = "queryId";
|
||||
private final DataSource dataSource;
|
||||
private final Map<String, Object> context;
|
||||
private final QuerySegmentSpec querySegmentSpec;
|
||||
|
|
|
@ -62,7 +62,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
|
|||
new Function<Interval, Iterable<Interval>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<Interval> apply(@Nullable Interval input)
|
||||
public Iterable<Interval> apply(Interval input)
|
||||
{
|
||||
return splitInterval(input);
|
||||
}
|
||||
|
|
|
@ -249,7 +249,7 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
|
|||
@Override
|
||||
public int compareTo(PrioritizedListenableFutureTask otherTask)
|
||||
{
|
||||
return -Ints.compare(getPriority(), otherTask.getPriority());
|
||||
return Ints.compare(otherTask.getPriority(), getPriority());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public class Queries
|
|||
new Function<AggregatorFactory, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable AggregatorFactory input)
|
||||
public String apply(AggregatorFactory input)
|
||||
{
|
||||
return input.getName();
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class QueryRunnerHelper
|
|||
new Function<Cursor, Result<T>>()
|
||||
{
|
||||
@Override
|
||||
public Result<T> apply(@Nullable Cursor input)
|
||||
public Result<T> apply(Cursor input)
|
||||
{
|
||||
log.debug("Running over cursor[%s]", adapter.getInterval(), input.getTime());
|
||||
return mapFn.apply(input);
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.aggregation;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -129,7 +130,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = fieldName.getBytes();
|
||||
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
|
||||
}
|
||||
|
|
|
@ -154,7 +154,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = fieldName.getBytes();
|
||||
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.aggregation;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -125,7 +126,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = fieldName.getBytes();
|
||||
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.aggregation;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -126,7 +127,7 @@ public class MaxAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = fieldName.getBytes();
|
||||
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.aggregation;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -126,7 +127,7 @@ public class MinAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = fieldName.getBytes();
|
||||
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.dimension;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import io.druid.query.extraction.DimExtractionFn;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -68,7 +69,7 @@ public class DefaultDimensionSpec implements DimensionSpec
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimensionBytes = dimension.getBytes();
|
||||
byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + dimensionBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.dimension;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import io.druid.query.extraction.DimExtractionFn;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -73,7 +74,7 @@ public class ExtractionDimensionSpec implements DimensionSpec
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimensionBytes = dimension.getBytes();
|
||||
byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
byte[] dimExtractionFnBytes = dimExtractionFn.getCacheKey();
|
||||
|
||||
return ByteBuffer.allocate(1 + dimensionBytes.length + dimExtractionFnBytes.length)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.extraction;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import org.mozilla.javascript.Context;
|
||||
import org.mozilla.javascript.ContextFactory;
|
||||
|
@ -80,7 +81,7 @@ public class JavascriptDimExtractionFn implements DimExtractionFn
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] bytes = function.getBytes();
|
||||
byte[] bytes = function.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(1 + bytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(bytes)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.extraction;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -47,7 +48,7 @@ public class PartialDimExtractionFn implements DimExtractionFn
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] exprBytes = expr.getBytes();
|
||||
byte[] exprBytes = expr.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(1 + exprBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(exprBytes)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.extraction;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -47,7 +48,7 @@ public class RegexDimExtractionFn implements DimExtractionFn
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] exprBytes = expr.getBytes();
|
||||
byte[] exprBytes = expr.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(1 + exprBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(exprBytes)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.extraction;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.ibm.icu.text.SimpleDateFormat;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -55,7 +56,7 @@ public class TimeDimExtractionFn implements DimExtractionFn
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] timeFormatBytes = timeFormat.getBytes();
|
||||
byte[] timeFormatBytes = timeFormat.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(1 + timeFormatBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(timeFormatBytes)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.filter;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.query.extraction.DimExtractionFn;
|
||||
|
||||
|
@ -71,8 +72,8 @@ public class ExtractionDimFilter implements DimFilter
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimensionBytes = dimension.getBytes();
|
||||
byte[] valueBytes = value.getBytes();
|
||||
byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
byte[] valueBytes = value.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + dimensionBytes.length + valueBytes.length)
|
||||
.put(DimFilterCacheHelper.EXTRACTION_CACHE_ID)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.filter;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -47,8 +48,8 @@ public class SelectorDimFilter implements DimFilter
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimensionBytes = dimension.getBytes();
|
||||
byte[] valueBytes = (value == null) ? new byte[]{} : value.getBytes();
|
||||
byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
byte[] valueBytes = (value == null) ? new byte[]{} : value.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + dimensionBytes.length + valueBytes.length)
|
||||
.put(DimFilterCacheHelper.SELECTOR_CACHE_ID)
|
||||
|
|
|
@ -20,6 +20,7 @@ package io.druid.query.filter;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.collections.spatial.search.Bound;
|
||||
|
||||
|
@ -48,7 +49,7 @@ public class SpatialDimFilter implements DimFilter
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimBytes = dimension.getBytes();
|
||||
byte[] dimBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
byte[] boundBytes = bound.getCacheKey();
|
||||
|
||||
return ByteBuffer.allocate(1 + dimBytes.length + boundBytes.length)
|
||||
|
|
|
@ -236,7 +236,7 @@ public class GroupByQueryEngine
|
|||
}
|
||||
}
|
||||
|
||||
private class PositionMaintainer
|
||||
private static class PositionMaintainer
|
||||
{
|
||||
private final int[] increments;
|
||||
private final int increment;
|
||||
|
@ -284,7 +284,7 @@ public class GroupByQueryEngine
|
|||
}
|
||||
}
|
||||
|
||||
private class RowIterator implements CloseableIterator<Row>
|
||||
private static class RowIterator implements CloseableIterator<Row>
|
||||
{
|
||||
private final GroupByQuery query;
|
||||
private final Cursor cursor;
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.groupby.having;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import io.druid.data.input.Row;
|
||||
|
||||
|
@ -71,7 +72,7 @@ public class EqualToHavingSpec implements HavingSpec
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final byte[] aggBytes = aggregationName.getBytes();
|
||||
final byte[] aggBytes = aggregationName.getBytes(Charsets.UTF_8);
|
||||
final byte[] valBytes = Bytes.toArray(Arrays.asList(value));
|
||||
return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length)
|
||||
.put(CACHE_KEY)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.groupby.having;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import io.druid.data.input.Row;
|
||||
|
||||
|
@ -71,7 +72,7 @@ public class GreaterThanHavingSpec implements HavingSpec
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final byte[] aggBytes = aggregationName.getBytes();
|
||||
final byte[] aggBytes = aggregationName.getBytes(Charsets.UTF_8);
|
||||
final byte[] valBytes = Bytes.toArray(Arrays.asList(value));
|
||||
return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length)
|
||||
.put(CACHE_KEY)
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.groupby.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import io.druid.data.input.Row;
|
||||
|
||||
|
@ -69,7 +70,7 @@ public class LessThanHavingSpec implements HavingSpec
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final byte[] aggBytes = aggregationName.getBytes();
|
||||
final byte[] aggBytes = aggregationName.getBytes(Charsets.UTF_8);
|
||||
final byte[] valBytes = Bytes.toArray(Arrays.asList(value));
|
||||
return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length)
|
||||
.put(CACHE_KEY)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.groupby.orderby;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -182,8 +183,8 @@ public class OrderByColumnSpec
|
|||
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
final byte[] dimensionBytes = dimension.getBytes();
|
||||
final byte[] directionBytes = direction.name().getBytes();
|
||||
final byte[] dimensionBytes = dimension.getBytes(Charsets.UTF_8);
|
||||
final byte[] directionBytes = direction.name().getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(dimensionBytes.length + directionBytes.length)
|
||||
.put(dimensionBytes)
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.search;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -172,7 +173,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
int dimensionsBytesSize = 0;
|
||||
int index = 0;
|
||||
for (String dimension : dimensions) {
|
||||
dimensionsBytes[index] = dimension.getBytes();
|
||||
dimensionsBytes[index] = dimension.getBytes(Charsets.UTF_8);
|
||||
dimensionsBytesSize += dimensionsBytes[index].length;
|
||||
++index;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.search.search;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -77,7 +78,7 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec
|
|||
int valuesBytesSize = 0;
|
||||
int index = 0;
|
||||
for (String value : values) {
|
||||
valuesBytes[index] = value.getBytes();
|
||||
valuesBytes[index] = value.getBytes(Charsets.UTF_8);
|
||||
valuesBytesSize += valuesBytes[index].length;
|
||||
++index;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.search.search;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
@ -58,7 +59,7 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
|
|||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] valueBytes = value.getBytes();
|
||||
byte[] valueBytes = value.getBytes(Charsets.UTF_8);
|
||||
|
||||
return ByteBuffer.allocate(1 + valueBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.select;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -65,7 +66,7 @@ public class PagingSpec
|
|||
int pagingKeysSize = 0;
|
||||
int pagingValuesSize = 0;
|
||||
for (Map.Entry<String, Integer> entry : pagingIdentifiers.entrySet()) {
|
||||
pagingKeys[index] = entry.getKey().getBytes();
|
||||
pagingKeys[index] = entry.getKey().getBytes(Charsets.UTF_8);
|
||||
pagingValues[index] = ByteBuffer.allocate(Ints.BYTES).putInt(entry.getValue()).array();
|
||||
pagingKeysSize += pagingKeys[index].length;
|
||||
pagingValuesSize += Ints.BYTES;
|
||||
|
|
|
@ -115,24 +115,34 @@ public class SelectQueryEngine
|
|||
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
|
||||
final String dim = dimSelector.getKey();
|
||||
final DimensionSelector selector = dimSelector.getValue();
|
||||
final IndexedInts vals = selector.getRow();
|
||||
|
||||
if (vals.size() == 1) {
|
||||
final String dimVal = selector.lookupName(vals.get(0));
|
||||
theEvent.put(dim, dimVal);
|
||||
if (selector == null) {
|
||||
theEvent.put(dim, null);
|
||||
} else {
|
||||
List<String> dimVals = Lists.newArrayList();
|
||||
for (int i = 0; i < vals.size(); ++i) {
|
||||
dimVals.add(selector.lookupName(vals.get(i)));
|
||||
final IndexedInts vals = selector.getRow();
|
||||
|
||||
if (vals.size() == 1) {
|
||||
final String dimVal = selector.lookupName(vals.get(0));
|
||||
theEvent.put(dim, dimVal);
|
||||
} else {
|
||||
List<String> dimVals = Lists.newArrayList();
|
||||
for (int i = 0; i < vals.size(); ++i) {
|
||||
dimVals.add(selector.lookupName(vals.get(i)));
|
||||
}
|
||||
theEvent.put(dim, dimVals);
|
||||
}
|
||||
theEvent.put(dim, dimVals);
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
|
||||
final String metric = metSelector.getKey();
|
||||
final ObjectColumnSelector selector = metSelector.getValue();
|
||||
theEvent.put(metric, selector.get());
|
||||
|
||||
if (selector == null) {
|
||||
theEvent.put(metric, null);
|
||||
} else {
|
||||
theEvent.put(metric, selector.get());
|
||||
}
|
||||
}
|
||||
|
||||
builder.addEntry(
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.select;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -173,7 +174,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
|||
int dimensionsBytesSize = 0;
|
||||
int index = 0;
|
||||
for (String dimension : dimensions) {
|
||||
dimensionsBytes[index] = dimension.getBytes();
|
||||
dimensionsBytes[index] = dimension.getBytes(Charsets.UTF_8);
|
||||
dimensionsBytesSize += dimensionsBytes[index].length;
|
||||
++index;
|
||||
}
|
||||
|
@ -187,7 +188,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
|||
int metricBytesSize = 0;
|
||||
index = 0;
|
||||
for (String metric : metrics) {
|
||||
metricBytes[index] = metric.getBytes();
|
||||
metricBytes[index] = metric.getBytes(Charsets.UTF_8);
|
||||
metricBytesSize += metricBytes[index].length;
|
||||
++index;
|
||||
}
|
||||
|
|
|
@ -841,6 +841,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
|
||||
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
|
||||
*/
|
||||
public String get(String value)
|
||||
{
|
||||
return interner.getCanonicalValue(value);
|
||||
|
@ -966,6 +970,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
@Override
|
||||
public boolean compareCanonicalValues(String s1, String s2)
|
||||
{
|
||||
/**
|
||||
* using == here instead of .equals() to speed up lookups
|
||||
*/
|
||||
return s1 == s2;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -426,7 +426,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
@Override
|
||||
public Object get()
|
||||
{
|
||||
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
|
||||
final String[][] dims = currEntry.getKey().getDims();
|
||||
if(dimensionIndex >= dims.length) {
|
||||
return null;
|
||||
}
|
||||
final String[] dimVals = dims[dimensionIndex];
|
||||
if (dimVals.length == 1) {
|
||||
return dimVals[0];
|
||||
} else if (dimVals.length == 0) {
|
||||
|
|
|
@ -30,6 +30,8 @@ import io.druid.query.QueryRunner;
|
|||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.filter.AndDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.spec.LegacySegmentSpec;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -359,6 +361,97 @@ public class SelectQueryRunnerTest
|
|||
verify(expectedResults, results);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFullSelectNoResults()
|
||||
{
|
||||
SelectQuery query = new SelectQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")),
|
||||
new AndDimFilter(
|
||||
Arrays.<DimFilter>asList(
|
||||
new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "spot"),
|
||||
new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "foo")
|
||||
)
|
||||
),
|
||||
QueryRunnerTestHelper.allGran,
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<String>newArrayList(),
|
||||
new PagingSpec(null, 3),
|
||||
null
|
||||
);
|
||||
|
||||
Iterable<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, Maps.newHashMap()),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<SelectResultValue>(
|
||||
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||
new SelectResultValue(
|
||||
ImmutableMap.<String, Integer>of(),
|
||||
Lists.<EventHolder>newArrayList()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
verify(expectedResults, results);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testFullSelectNoDimensionAndMetric()
|
||||
{
|
||||
SelectQuery query = new SelectQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")),
|
||||
null,
|
||||
QueryRunnerTestHelper.allGran,
|
||||
Lists.<String>newArrayList("foo"),
|
||||
Lists.<String>newArrayList("foo2"),
|
||||
new PagingSpec(null, 3),
|
||||
null
|
||||
);
|
||||
|
||||
Iterable<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, Maps.newHashMap()),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
Map<String, Object> res = Maps.newHashMap();
|
||||
res.put("timestamp", new DateTime("2011-01-12T00:00:00.000Z"));
|
||||
res.put("foo", null);
|
||||
res.put("foo2", null);
|
||||
|
||||
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<SelectResultValue>(
|
||||
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||
new SelectResultValue(
|
||||
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
|
||||
Arrays.asList(
|
||||
new EventHolder(
|
||||
QueryRunnerTestHelper.segmentId,
|
||||
0,
|
||||
res
|
||||
),
|
||||
new EventHolder(
|
||||
QueryRunnerTestHelper.segmentId,
|
||||
1,
|
||||
res
|
||||
),
|
||||
new EventHolder(
|
||||
QueryRunnerTestHelper.segmentId,
|
||||
2,
|
||||
res
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
verify(expectedResults, results);
|
||||
}
|
||||
|
||||
private static void verify(
|
||||
Iterable<Result<SelectResultValue>> expectedResults,
|
||||
Iterable<Result<SelectResultValue>> actualResults
|
||||
|
|
|
@ -35,6 +35,7 @@ import io.druid.query.Result;
|
|||
import io.druid.query.TestQueryRunners;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.filter.DimFilters;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
|
@ -53,6 +54,7 @@ import org.junit.Test;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -81,26 +83,7 @@ public class IncrementalIndexStorageAdapterTest
|
|||
)
|
||||
);
|
||||
|
||||
GroupByQueryEngine engine = new GroupByQueryEngine(
|
||||
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
|
||||
{
|
||||
@Override
|
||||
public int getMaxIntermediateRows()
|
||||
{
|
||||
return 5;
|
||||
}
|
||||
}),
|
||||
new StupidPool(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(50000);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
GroupByQueryEngine engine = makeGroupByQueryEngine();
|
||||
|
||||
final Sequence<Row> rows = engine.process(
|
||||
GroupByQuery.builder()
|
||||
|
@ -125,6 +108,93 @@ public class IncrementalIndexStorageAdapterTest
|
|||
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
|
||||
{
|
||||
IncrementalIndex index = new IncrementalIndex(
|
||||
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, TestQueryRunners.pool
|
||||
);
|
||||
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime("2014-09-01T00:00:00"),
|
||||
Lists.newArrayList("billy"),
|
||||
ImmutableMap.<String, Object>of("billy", "hi")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime("2014-09-01T01:00:00"),
|
||||
Lists.newArrayList("billy", "sally"),
|
||||
ImmutableMap.<String, Object>of(
|
||||
"billy", "hip",
|
||||
"sally", "hop"
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
GroupByQueryEngine engine = makeGroupByQueryEngine();
|
||||
|
||||
final Sequence<Row> rows = engine.process(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource("test")
|
||||
.setGranularity(QueryGranularity.ALL)
|
||||
.setInterval(new Interval(0, new DateTime().getMillis()))
|
||||
.addDimension("billy")
|
||||
.addDimension("sally")
|
||||
.addAggregator(
|
||||
new LongSumAggregatorFactory("cnt", "cnt")
|
||||
)
|
||||
.addAggregator(
|
||||
new JavaScriptAggregatorFactory(
|
||||
"fieldLength",
|
||||
Arrays.asList("sally", "billy"),
|
||||
"function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }",
|
||||
"function() { return 0; }",
|
||||
"function(a,b) { return a + b; }"
|
||||
)
|
||||
)
|
||||
.build(),
|
||||
new IncrementalIndexStorageAdapter(index)
|
||||
);
|
||||
|
||||
final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());
|
||||
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
MapBasedRow row = (MapBasedRow) results.get(0);
|
||||
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l, "fieldLength", 2.0), row.getEvent());
|
||||
|
||||
row = (MapBasedRow) results.get(1);
|
||||
Assert.assertEquals(ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1l, "fieldLength", 6.0), row.getEvent());
|
||||
}
|
||||
|
||||
private static GroupByQueryEngine makeGroupByQueryEngine()
|
||||
{
|
||||
return new GroupByQueryEngine(
|
||||
Suppliers.<GroupByQueryConfig>ofInstance(
|
||||
new GroupByQueryConfig()
|
||||
{
|
||||
@Override
|
||||
public int getMaxIntermediateRows()
|
||||
{
|
||||
return 5;
|
||||
}
|
||||
}
|
||||
),
|
||||
new StupidPool(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(50000);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetSanity() {
|
||||
IncrementalIndex index = new IncrementalIndex(
|
||||
|
@ -241,60 +311,41 @@ public class IncrementalIndexStorageAdapterTest
|
|||
TestQueryRunners.pool
|
||||
);
|
||||
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime().minus(1).getMillis(),
|
||||
Lists.newArrayList("billy"),
|
||||
ImmutableMap.<String, Object>of("billy", "hi")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime().minus(1).getMillis(),
|
||||
Lists.newArrayList("sally"),
|
||||
ImmutableMap.<String, Object>of("sally", "bo")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime().minus(1).getMillis(),
|
||||
Lists.newArrayList("billy"),
|
||||
ImmutableMap.<String, Object>of("billy", "hi")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime().minus(1).getMillis(),
|
||||
Lists.newArrayList("sally"),
|
||||
ImmutableMap.<String, Object>of("sally", "bo")
|
||||
)
|
||||
);
|
||||
|
||||
GroupByQueryEngine engine = new GroupByQueryEngine(
|
||||
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
|
||||
{
|
||||
@Override
|
||||
public int getMaxIntermediateRows()
|
||||
{
|
||||
return 5;
|
||||
}
|
||||
}),
|
||||
new StupidPool<ByteBuffer>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(50000);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
GroupByQueryEngine engine = makeGroupByQueryEngine();
|
||||
|
||||
final Sequence<Row> rows = engine.process(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource("test")
|
||||
.setGranularity(QueryGranularity.ALL)
|
||||
.setInterval(new Interval(0, new DateTime().getMillis()))
|
||||
.addDimension("billy")
|
||||
.addDimension("sally")
|
||||
.addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
|
||||
.setDimFilter(DimFilters.dimEquals("sally", (String) null))
|
||||
.build(),
|
||||
new IncrementalIndexStorageAdapter(index)
|
||||
);
|
||||
final Sequence<Row> rows = engine.process(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource("test")
|
||||
.setGranularity(QueryGranularity.ALL)
|
||||
.setInterval(new Interval(0, new DateTime().getMillis()))
|
||||
.addDimension("billy")
|
||||
.addDimension("sally")
|
||||
.addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
|
||||
.setDimFilter(DimFilters.dimEquals("sally", (String) null))
|
||||
.build(),
|
||||
new IncrementalIndexStorageAdapter(index)
|
||||
);
|
||||
|
||||
final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());
|
||||
final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());
|
||||
|
||||
Assert.assertEquals(1, results.size());
|
||||
Assert.assertEquals(1, results.size());
|
||||
|
||||
MapBasedRow row = (MapBasedRow) results.get(0);
|
||||
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent());
|
||||
}
|
||||
MapBasedRow row = (MapBasedRow) results.get(0);
|
||||
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.firehose.rabbitmq;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Channel;
|
||||
|
@ -227,7 +228,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
|
|||
return null;
|
||||
}
|
||||
|
||||
return stringParser.parse(new String(delivery.getBody()));
|
||||
return stringParser.parse(new String(delivery.getBody(), Charsets.UTF_8));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -174,6 +174,12 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentViewInitialized()
|
||||
{
|
||||
return callback.segmentViewInitialized();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -104,6 +104,12 @@ public class BrokerServerView implements TimelineServerView
|
|||
serverRemovedSegment(server, segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentViewInitialized()
|
||||
{
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.client.cache.Cache;
|
||||
|
@ -39,7 +40,7 @@ public class CacheUtil
|
|||
)
|
||||
{
|
||||
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
||||
final byte[] versionBytes = descriptor.getVersion().getBytes(Charsets.UTF_8);
|
||||
|
||||
return new Cache.NamedKey(
|
||||
segmentIdentifier, ByteBuffer
|
||||
|
|
|
@ -168,6 +168,22 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
|
|||
{
|
||||
return removeInnerInventory(container, inventoryKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inventoryInitialized()
|
||||
{
|
||||
log.info("Inventory Initialized");
|
||||
runSegmentCallbacks(
|
||||
new Function<SegmentCallback, CallbackAction>()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
{
|
||||
return input.segmentViewInitialized();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -91,6 +91,8 @@ public interface ServerView
|
|||
* should remain registered.
|
||||
*/
|
||||
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment);
|
||||
|
||||
public CallbackAction segmentViewInitialized();
|
||||
}
|
||||
|
||||
public static abstract class BaseSegmentCallback implements SegmentCallback
|
||||
|
@ -106,5 +108,11 @@ public interface ServerView
|
|||
{
|
||||
return CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentViewInitialized()
|
||||
{
|
||||
return CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,6 +138,12 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
return action;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentViewInitialized()
|
||||
{
|
||||
return callback.segmentViewInitialized();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -27,8 +27,8 @@ import java.util.List;
|
|||
|
||||
public class CacheConfig
|
||||
{
|
||||
public static String USE_CACHE = "useCache";
|
||||
public static String POPULATE_CACHE = "populateCache";
|
||||
public static final String USE_CACHE = "useCache";
|
||||
public static final String POPULATE_CACHE = "populateCache";
|
||||
|
||||
@JsonProperty
|
||||
private boolean useCache = true;
|
||||
|
|
|
@ -34,7 +34,7 @@ public class HighestPriorityTierSelectorStrategy extends AbstractTierSelectorStr
|
|||
@Override
|
||||
public int compare(Integer o1, Integer o2)
|
||||
{
|
||||
return -Ints.compare(o1, o2);
|
||||
return Ints.compare(o2, o1);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.base.Function;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
|||
import org.apache.curator.utils.ZKPaths;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -62,6 +64,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
private final CuratorInventoryManagerStrategy<ContainerClass, InventoryClass> strategy;
|
||||
|
||||
private final ConcurrentMap<String, ContainerHolder> containers;
|
||||
private final Set<ContainerHolder> uninitializedInventory;
|
||||
private final PathChildrenCacheFactory cacheFactory;
|
||||
|
||||
private volatile PathChildrenCache childrenCache;
|
||||
|
@ -78,6 +81,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
this.strategy = strategy;
|
||||
|
||||
this.containers = new MapMaker().makeMap();
|
||||
this.uninitializedInventory = Sets.newConcurrentHashSet();
|
||||
|
||||
this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec));
|
||||
}
|
||||
|
@ -96,7 +100,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
childrenCache.getListenable().addListener(new ContainerCacheListener());
|
||||
|
||||
try {
|
||||
childrenCache.start();
|
||||
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
|
||||
}
|
||||
catch (Exception e) {
|
||||
synchronized (lock) {
|
||||
|
@ -164,6 +168,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
{
|
||||
private final AtomicReference<ContainerClass> container;
|
||||
private final PathChildrenCache cache;
|
||||
private boolean initialized = false;
|
||||
|
||||
ContainerHolder(
|
||||
ContainerClass container,
|
||||
|
@ -192,21 +197,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
|
||||
private class ContainerCacheListener implements PathChildrenCacheListener
|
||||
{
|
||||
private volatile boolean containersInitialized = false;
|
||||
private volatile boolean doneInitializing = false;
|
||||
|
||||
@Override
|
||||
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
||||
{
|
||||
final ChildData child = event.getData();
|
||||
if (child == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
final ContainerClass container;
|
||||
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
synchronized (lock) {
|
||||
container = strategy.deserializeContainer(child.getData());
|
||||
final ChildData child = event.getData();
|
||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final ContainerClass container = strategy.deserializeContainer(child.getData());
|
||||
|
||||
// This would normally be a race condition, but the only thing that should be mutating the containers
|
||||
// map is this listener, which should never run concurrently. If the same container is going to disappear
|
||||
|
@ -221,7 +224,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
|
||||
|
||||
log.info("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath);
|
||||
inventoryCache.start();
|
||||
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
|
||||
strategy.newContainer(container);
|
||||
}
|
||||
|
||||
|
@ -229,6 +232,9 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
}
|
||||
case CHILD_REMOVED:
|
||||
synchronized (lock) {
|
||||
final ChildData child = event.getData();
|
||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final ContainerHolder removed = containers.remove(containerKey);
|
||||
if (removed == null) {
|
||||
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
|
||||
|
@ -242,11 +248,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
removed.getCache().close();
|
||||
strategy.deadContainer(removed.getContainer());
|
||||
|
||||
// also remove node from uninitilized, in case a nodes gets removed while we are starting up
|
||||
synchronized (removed) {
|
||||
markInventoryInitialized(removed);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case CHILD_UPDATED:
|
||||
synchronized (lock) {
|
||||
container = strategy.deserializeContainer(child.getData());
|
||||
final ChildData child = event.getData();
|
||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final ContainerClass container = strategy.deserializeContainer(child.getData());
|
||||
|
||||
ContainerHolder oldContainer = containers.get(containerKey);
|
||||
if (oldContainer == null) {
|
||||
|
@ -259,6 +273,41 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
|
||||
break;
|
||||
}
|
||||
case INITIALIZED:
|
||||
synchronized (lock) {
|
||||
// must await initialized of all containerholders
|
||||
for(ContainerHolder holder : containers.values()) {
|
||||
synchronized (holder) {
|
||||
if(!holder.initialized) {
|
||||
uninitializedInventory.add(holder);
|
||||
}
|
||||
}
|
||||
}
|
||||
containersInitialized = true;
|
||||
maybeDoneInitializing();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// must be run in synchronized(lock) { synchronized(holder) { ... } } block
|
||||
private void markInventoryInitialized(final ContainerHolder holder)
|
||||
{
|
||||
holder.initialized = true;
|
||||
uninitializedInventory.remove(holder);
|
||||
maybeDoneInitializing();
|
||||
}
|
||||
|
||||
private void maybeDoneInitializing()
|
||||
{
|
||||
if(doneInitializing) {
|
||||
return;
|
||||
}
|
||||
|
||||
// only fire if we are done initializing the parent PathChildrenCache
|
||||
if(containersInitialized && uninitializedInventory.isEmpty()) {
|
||||
doneInitializing = true;
|
||||
strategy.inventoryInitialized();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -278,30 +327,28 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
@Override
|
||||
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
||||
{
|
||||
final ChildData child = event.getData();
|
||||
|
||||
if (child == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ContainerHolder holder = containers.get(containerKey);
|
||||
|
||||
if (holder == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
case CHILD_ADDED: {
|
||||
final ChildData child = event.getData();
|
||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
|
||||
|
||||
synchronized (holder) {
|
||||
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
|
||||
}
|
||||
|
||||
break;
|
||||
case CHILD_UPDATED:
|
||||
}
|
||||
|
||||
case CHILD_UPDATED: {
|
||||
final ChildData child = event.getData();
|
||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
|
||||
|
||||
synchronized (holder) {
|
||||
|
@ -309,11 +356,26 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
}
|
||||
|
||||
break;
|
||||
case CHILD_REMOVED:
|
||||
}
|
||||
|
||||
case CHILD_REMOVED: {
|
||||
final ChildData child = event.getData();
|
||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
synchronized (holder) {
|
||||
holder.setContainer(strategy.removeInventory(holder.getContainer(), inventoryKey));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case INITIALIZED:
|
||||
// make sure to acquire locks in (lock -> holder) order
|
||||
synchronized (lock) {
|
||||
synchronized (holder) {
|
||||
markInventoryInitialized(holder);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,4 +35,5 @@ public interface CuratorInventoryManagerStrategy<ContainerClass, InventoryClass>
|
|||
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
|
||||
public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
|
||||
public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
|
||||
public void inventoryInitialized();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.initialization;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -74,6 +75,7 @@ import org.eclipse.aether.util.filter.DependencyFilterUtils;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -294,13 +296,17 @@ public class Initialization
|
|||
|
||||
}
|
||||
}
|
||||
)
|
||||
, false, Charsets.UTF_8.name())
|
||||
);
|
||||
return new DefaultTeslaAether(
|
||||
config.getLocalRepository(),
|
||||
remoteRepositories.toArray(new Repository[remoteRepositories.size()])
|
||||
);
|
||||
}
|
||||
catch(UnsupportedEncodingException e) {
|
||||
// should never happen
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
finally {
|
||||
System.setOut(oldOut);
|
||||
}
|
||||
|
|
|
@ -21,11 +21,14 @@ package io.druid.segment.indexing;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DataSchema
|
||||
|
@ -44,7 +47,31 @@ public class DataSchema
|
|||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.parser = parser;
|
||||
|
||||
final Set<String> dimensionExclusions = Sets.newHashSet();
|
||||
for (AggregatorFactory aggregator : aggregators) {
|
||||
dimensionExclusions.add(aggregator.getName());
|
||||
}
|
||||
if (parser != null && parser.getParseSpec() != null) {
|
||||
if (parser.getParseSpec().getTimestampSpec() != null) {
|
||||
dimensionExclusions.add(parser.getParseSpec().getTimestampSpec().getTimestampColumn());
|
||||
}
|
||||
if (parser.getParseSpec().getDimensionsSpec() != null) {
|
||||
this.parser = parser.withParseSpec(
|
||||
parser.getParseSpec()
|
||||
.withDimensionsSpec(
|
||||
parser.getParseSpec()
|
||||
.getDimensionsSpec()
|
||||
.withDimensionExclusions(dimensionExclusions)
|
||||
)
|
||||
);
|
||||
} else {
|
||||
this.parser = parser;
|
||||
}
|
||||
} else {
|
||||
this.parser = parser;
|
||||
}
|
||||
|
||||
this.aggregators = aggregators;
|
||||
this.granularitySpec = granularitySpec == null
|
||||
? new UniformGranularitySpec(null, null, null, null)
|
||||
|
|
|
@ -40,6 +40,12 @@ public class SegmentLoaderConfig
|
|||
@JsonProperty("dropSegmentDelayMillis")
|
||||
private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
|
||||
|
||||
@JsonProperty("announceIntervalMillis")
|
||||
private int announceIntervalMillis = 0; // do not background announce
|
||||
|
||||
@JsonProperty("numLoadingThreads")
|
||||
private int numLoadingThreads = 1;
|
||||
|
||||
@JsonProperty
|
||||
private File infoDir = null;
|
||||
|
||||
|
@ -58,6 +64,16 @@ public class SegmentLoaderConfig
|
|||
return dropSegmentDelayMillis;
|
||||
}
|
||||
|
||||
public int getAnnounceIntervalMillis()
|
||||
{
|
||||
return announceIntervalMillis;
|
||||
}
|
||||
|
||||
public int getNumLoadingThreads()
|
||||
{
|
||||
return numLoadingThreads;
|
||||
}
|
||||
|
||||
public File getInfoDir()
|
||||
{
|
||||
if (infoDir == null) {
|
||||
|
|
|
@ -26,7 +26,6 @@ import com.google.common.collect.Iterators;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.offheap.OffheapBufferPool;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -48,7 +47,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
*/
|
||||
public class Sink implements Iterable<FireHydrant>
|
||||
{
|
||||
private static final Logger log = new Logger(Sink.class);
|
||||
|
||||
private final Object hydrantLock = new Object();
|
||||
private final Interval interval;
|
||||
private final DataSchema schema;
|
||||
private final RealtimeTuningConfig config;
|
||||
|
@ -56,7 +56,6 @@ public class Sink implements Iterable<FireHydrant>
|
|||
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
|
||||
private volatile FireHydrant currHydrant;
|
||||
|
||||
|
||||
public Sink(
|
||||
Interval interval,
|
||||
DataSchema schema,
|
||||
|
@ -117,7 +116,7 @@ public class Sink implements Iterable<FireHydrant>
|
|||
throw new IAE("No currHydrant but given row[%s]", row);
|
||||
}
|
||||
|
||||
synchronized (currHydrant) {
|
||||
synchronized (hydrantLock) {
|
||||
IncrementalIndex index = currHydrant.getIndex();
|
||||
if (index == null) {
|
||||
return -1; // the hydrant was swapped without being replaced
|
||||
|
@ -128,7 +127,7 @@ public class Sink implements Iterable<FireHydrant>
|
|||
|
||||
public boolean isEmpty()
|
||||
{
|
||||
synchronized (currHydrant) {
|
||||
synchronized (hydrantLock) {
|
||||
return hydrants.size() == 1 && currHydrant.getIndex().isEmpty();
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +144,7 @@ public class Sink implements Iterable<FireHydrant>
|
|||
|
||||
public boolean swappable()
|
||||
{
|
||||
synchronized (currHydrant) {
|
||||
synchronized (hydrantLock) {
|
||||
return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0;
|
||||
}
|
||||
}
|
||||
|
@ -192,17 +191,11 @@ public class Sink implements Iterable<FireHydrant>
|
|||
config.isIngestOffheap()
|
||||
);
|
||||
|
||||
FireHydrant old;
|
||||
if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null
|
||||
final FireHydrant old;
|
||||
synchronized (hydrantLock) {
|
||||
old = currHydrant;
|
||||
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
|
||||
hydrants.add(currHydrant);
|
||||
} else {
|
||||
synchronized (currHydrant) {
|
||||
old = currHydrant;
|
||||
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
|
||||
hydrants.add(currHydrant);
|
||||
}
|
||||
}
|
||||
|
||||
return old;
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
|
|||
import io.druid.client.ServerView;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.db.DatabaseSegmentManager;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.segment.realtime.DbSegmentPublisher;
|
||||
import io.druid.server.coordination.BaseZkCoordinator;
|
||||
import io.druid.server.coordination.DataSegmentChangeCallback;
|
||||
|
@ -53,6 +54,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
|
|||
public BridgeZkCoordinator(
|
||||
ObjectMapper jsonMapper,
|
||||
ZkPathsConfig zkPaths,
|
||||
SegmentLoaderConfig config,
|
||||
DruidServerMetadata me,
|
||||
@Bridge CuratorFramework curator,
|
||||
DbSegmentPublisher dbSegmentPublisher,
|
||||
|
@ -60,7 +62,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
|
|||
ServerView serverView
|
||||
)
|
||||
{
|
||||
super(jsonMapper, zkPaths, me, curator);
|
||||
super(jsonMapper, zkPaths, config, me, curator);
|
||||
|
||||
this.dbSegmentPublisher = dbSegmentPublisher;
|
||||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
|
|
|
@ -21,10 +21,13 @@ package io.druid.server.coordination;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||
|
@ -34,6 +37,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
|||
import org.apache.curator.utils.ZKPaths;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -45,23 +50,33 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
|
|||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ZkPathsConfig zkPaths;
|
||||
private final SegmentLoaderConfig config;
|
||||
private final DruidServerMetadata me;
|
||||
private final CuratorFramework curator;
|
||||
|
||||
private volatile PathChildrenCache loadQueueCache;
|
||||
private volatile boolean started;
|
||||
private final ListeningExecutorService loadingExec;
|
||||
|
||||
public BaseZkCoordinator(
|
||||
ObjectMapper jsonMapper,
|
||||
ZkPathsConfig zkPaths,
|
||||
SegmentLoaderConfig config,
|
||||
DruidServerMetadata me,
|
||||
CuratorFramework curator
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.zkPaths = zkPaths;
|
||||
this.config = config;
|
||||
this.me = me;
|
||||
this.curator = curator;
|
||||
this.loadingExec = MoreExecutors.listeningDecorator(
|
||||
Executors.newFixedThreadPool(
|
||||
config.getNumLoadingThreads(),
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
@ -83,7 +98,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
|
|||
loadQueueLocation,
|
||||
true,
|
||||
true,
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
|
||||
loadingExec
|
||||
);
|
||||
|
||||
try {
|
||||
|
@ -200,4 +215,9 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
|
|||
public abstract void loadLocalCache();
|
||||
|
||||
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
|
||||
|
||||
public ListeningExecutorService getLoadingExecutor()
|
||||
{
|
||||
return loadingExec;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,11 @@ package io.druid.server.coordination;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Queues;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
|
@ -33,7 +37,11 @@ import org.apache.curator.framework.CuratorFramework;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -60,7 +68,7 @@ public class ZkCoordinator extends BaseZkCoordinator
|
|||
ScheduledExecutorFactory factory
|
||||
)
|
||||
{
|
||||
super(jsonMapper, zkPaths, me, curator);
|
||||
super(jsonMapper, zkPaths, config, me, curator);
|
||||
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
|
@ -121,42 +129,47 @@ public class ZkCoordinator extends BaseZkCoordinator
|
|||
return ZkCoordinator.this;
|
||||
}
|
||||
|
||||
private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
|
||||
{
|
||||
final boolean loaded;
|
||||
try {
|
||||
loaded = serverManager.loadSegment(segment);
|
||||
}
|
||||
catch (Exception e) {
|
||||
removeSegment(segment, callback);
|
||||
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
|
||||
}
|
||||
|
||||
if (loaded) {
|
||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.exists()) {
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
removeSegment(segment, callback);
|
||||
throw new SegmentLoadingException(
|
||||
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
return loaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
|
||||
{
|
||||
try {
|
||||
log.info("Loading segment %s", segment.getIdentifier());
|
||||
|
||||
final boolean loaded;
|
||||
try {
|
||||
loaded = serverManager.loadSegment(segment);
|
||||
}
|
||||
catch (Exception e) {
|
||||
removeSegment(segment, callback);
|
||||
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
|
||||
}
|
||||
|
||||
if (loaded) {
|
||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.exists()) {
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
removeSegment(segment, callback);
|
||||
throw new SegmentLoadingException(
|
||||
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if(loadSegment(segment, callback)) {
|
||||
try {
|
||||
announcer.announceSegment(segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
log.makeAlert(e, "Failed to load segment for dataSource")
|
||||
|
@ -168,60 +181,64 @@ public class ZkCoordinator extends BaseZkCoordinator
|
|||
}
|
||||
}
|
||||
|
||||
public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
|
||||
public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
|
||||
{
|
||||
try {
|
||||
final List<String> segmentFailures = Lists.newArrayList();
|
||||
final List<DataSegment> validSegments = Lists.newArrayList();
|
||||
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
|
||||
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
|
||||
backgroundSegmentAnnouncer.startAnnouncing();
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
log.info("Loading segment %s", segment.getIdentifier());
|
||||
final List<ListenableFuture> segmentLoading = Lists.newArrayList();
|
||||
|
||||
final boolean loaded;
|
||||
for (final DataSegment segment : segments) {
|
||||
segmentLoading.add(
|
||||
getLoadingExecutor().submit(
|
||||
new Callable<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void call() throws SegmentLoadingException
|
||||
{
|
||||
try {
|
||||
log.info("Loading segment %s", segment.getIdentifier());
|
||||
final boolean loaded = loadSegment(segment, callback);
|
||||
if (loaded) {
|
||||
try {
|
||||
backgroundSegmentAnnouncer.announceSegment(segment);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SegmentLoadingException(e, "Loading Interrupted");
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} catch(SegmentLoadingException e) {
|
||||
log.error(e, "[%s] failed to load", segment.getIdentifier());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
int failed = 0;
|
||||
for(ListenableFuture future : segmentLoading) {
|
||||
try {
|
||||
loaded = serverManager.loadSegment(segment);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
|
||||
removeSegment(segment, callback);
|
||||
segmentFailures.add(segment.getIdentifier());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (loaded) {
|
||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.exists()) {
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
|
||||
removeSegment(segment, callback);
|
||||
segmentFailures.add(segment.getIdentifier());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
validSegments.add(segment);
|
||||
future.get();
|
||||
} catch(InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SegmentLoadingException(e, "Loading Interrupted");
|
||||
} catch(ExecutionException e) {
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
announcer.announceSegments(validSegments);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
|
||||
if(failed > 0) {
|
||||
throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
|
||||
}
|
||||
|
||||
if (!segmentFailures.isEmpty()) {
|
||||
for (String segmentFailure : segmentFailures) {
|
||||
log.error("%s failed to load", segmentFailure);
|
||||
}
|
||||
throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size());
|
||||
}
|
||||
backgroundSegmentAnnouncer.finishAnnouncing();
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
log.makeAlert(e, "Failed to load segments for dataSource")
|
||||
log.makeAlert(e, "Failed to load segments")
|
||||
.addData("segments", segments)
|
||||
.emit();
|
||||
}
|
||||
|
@ -272,4 +289,134 @@ public class ZkCoordinator extends BaseZkCoordinator
|
|||
callback.execute();
|
||||
}
|
||||
}
|
||||
|
||||
private static class BackgroundSegmentAnnouncer implements AutoCloseable {
|
||||
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
|
||||
|
||||
private final int intervalMillis;
|
||||
private final DataSegmentAnnouncer announcer;
|
||||
private final ScheduledExecutorService exec;
|
||||
private final LinkedBlockingQueue<DataSegment> queue;
|
||||
private final SettableFuture<Boolean> doneAnnouncing;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private volatile boolean finished = false;
|
||||
private volatile ScheduledFuture startedAnnouncing = null;
|
||||
private volatile ScheduledFuture nextAnnoucement = null;
|
||||
|
||||
public BackgroundSegmentAnnouncer(
|
||||
DataSegmentAnnouncer announcer,
|
||||
ScheduledExecutorService exec,
|
||||
int intervalMillis
|
||||
)
|
||||
{
|
||||
this.announcer = announcer;
|
||||
this.exec = exec;
|
||||
this.intervalMillis = intervalMillis;
|
||||
this.queue = Queues.newLinkedBlockingQueue();
|
||||
this.doneAnnouncing = SettableFuture.create();
|
||||
}
|
||||
|
||||
public void announceSegment(final DataSegment segment) throws InterruptedException
|
||||
{
|
||||
if (finished) {
|
||||
throw new ISE("Announce segment called after finishAnnouncing");
|
||||
}
|
||||
queue.put(segment);
|
||||
}
|
||||
|
||||
public void startAnnouncing()
|
||||
{
|
||||
if (intervalMillis <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Starting background segment announcing task");
|
||||
|
||||
// schedule background announcing task
|
||||
nextAnnoucement = startedAnnouncing = exec.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
synchronized (lock) {
|
||||
try {
|
||||
if (!(finished && queue.isEmpty())) {
|
||||
final List<DataSegment> segments = Lists.newLinkedList();
|
||||
queue.drainTo(segments);
|
||||
try {
|
||||
announcer.announceSegments(segments);
|
||||
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (IOException e) {
|
||||
doneAnnouncing.setException(
|
||||
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
doneAnnouncing.set(true);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
doneAnnouncing.setException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
intervalMillis,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
}
|
||||
|
||||
public void finishAnnouncing() throws SegmentLoadingException
|
||||
{
|
||||
synchronized (lock) {
|
||||
finished = true;
|
||||
// announce any remaining segments
|
||||
try {
|
||||
final List<DataSegment> segments = Lists.newLinkedList();
|
||||
queue.drainTo(segments);
|
||||
announcer.announceSegments(segments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
|
||||
}
|
||||
|
||||
// get any exception that may have been thrown in background annoucing
|
||||
try {
|
||||
// check in case intervalMillis is <= 0
|
||||
if (startedAnnouncing != null) {
|
||||
startedAnnouncing.cancel(false);
|
||||
}
|
||||
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
|
||||
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
|
||||
if (doneAnnouncing.isDone()) {
|
||||
doneAnnouncing.get();
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SegmentLoadingException(e, "Loading Interrupted");
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
|
||||
}
|
||||
}
|
||||
log.info("Completed background segment announcing");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// stop background scheduling
|
||||
synchronized (lock) {
|
||||
finished = true;
|
||||
if (nextAnnoucement != null) {
|
||||
nextAnnoucement.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package io.druid.server.coordination.broker;
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import io.druid.client.ServerInventoryView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.curator.discovery.ServiceAnnouncer;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.server.DruidNode;
|
||||
|
||||
@ManageLifecycle
|
||||
public class DruidBroker
|
||||
{
|
||||
private final DruidNode self;
|
||||
private final ServiceAnnouncer serviceAnnouncer;
|
||||
private volatile boolean started = false;
|
||||
|
||||
@Inject
|
||||
public DruidBroker(
|
||||
final ServerInventoryView serverInventoryView,
|
||||
final @Self DruidNode self,
|
||||
final ServiceAnnouncer serviceAnnouncer
|
||||
)
|
||||
{
|
||||
this.self = self;
|
||||
this.serviceAnnouncer = serviceAnnouncer;
|
||||
|
||||
serverInventoryView.registerSegmentCallback(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new ServerView.BaseSegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentViewInitialized()
|
||||
{
|
||||
serviceAnnouncer.announce(self);
|
||||
return ServerView.CallbackAction.UNREGISTER;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (self) {
|
||||
if(started) {
|
||||
return;
|
||||
}
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (self) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
serviceAnnouncer.unannounce(self);
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -384,7 +384,7 @@ public class LoadQueuePeon
|
|||
}
|
||||
}
|
||||
|
||||
private class SegmentHolder
|
||||
private static class SegmentHolder
|
||||
{
|
||||
private final DataSegment segment;
|
||||
private final DataSegmentChangeRequest changeRequest;
|
||||
|
@ -457,4 +457,4 @@ public class LoadQueuePeon
|
|||
return changeRequest.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ public class DBResource
|
|||
new Function<DruidDataSource, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DruidDataSource dataSource)
|
||||
public String apply(DruidDataSource dataSource)
|
||||
{
|
||||
return dataSource.getName();
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ public class DBResource
|
|||
new Function<DataSegment, Object>()
|
||||
{
|
||||
@Override
|
||||
public Object apply(@Nullable DataSegment segment)
|
||||
public Object apply(DataSegment segment)
|
||||
{
|
||||
return segment.getIdentifier();
|
||||
}
|
||||
|
|
|
@ -261,7 +261,7 @@ public class DatasourcesResource
|
|||
final DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
|
||||
final Interval theInterval = new Interval(interval.replace("_", "/"));
|
||||
|
||||
if (dataSource == null || interval == null) {
|
||||
if (dataSource == null) {
|
||||
return Response.noContent().build();
|
||||
}
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@ public class ServersResource
|
|||
new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DataSegment segment)
|
||||
public String apply(DataSegment segment)
|
||||
{
|
||||
return segment.getIdentifier();
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ public class EmittingRequestLogger implements RequestLogger
|
|||
}
|
||||
}
|
||||
|
||||
private class RequestLogEventBuilder implements ServiceEventBuilder
|
||||
private static class RequestLogEventBuilder implements ServiceEventBuilder
|
||||
{
|
||||
private final String feed;
|
||||
private final RequestLogLine requestLogLine;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.server.log;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
|
@ -31,8 +32,9 @@ import org.joda.time.Duration;
|
|||
import org.joda.time.MutableDateTime;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
|
@ -47,7 +49,7 @@ public class FileRequestLogger implements RequestLogger
|
|||
private final Object lock = new Object();
|
||||
|
||||
private volatile DateTime currentDay;
|
||||
private volatile FileWriter fileWriter;
|
||||
private volatile OutputStreamWriter fileWriter;
|
||||
|
||||
public FileRequestLogger(ObjectMapper objectMapper, ScheduledExecutorService exec, File baseDir)
|
||||
{
|
||||
|
@ -66,7 +68,10 @@ public class FileRequestLogger implements RequestLogger
|
|||
mutableDateTime.setMillisOfDay(0);
|
||||
currentDay = mutableDateTime.toDateTime();
|
||||
|
||||
fileWriter = new FileWriter(new File(baseDir, currentDay.toString("yyyy-MM-dd'.log'")), true);
|
||||
fileWriter = new OutputStreamWriter(
|
||||
new FileOutputStream(new File(baseDir, currentDay.toString("yyyy-MM-dd'.log'")), true),
|
||||
Charsets.UTF_8
|
||||
);
|
||||
long nextDay = currentDay.plusDays(1).getMillis();
|
||||
Duration delay = new Duration(nextDay - new DateTime().getMillis());
|
||||
|
||||
|
@ -84,7 +89,10 @@ public class FileRequestLogger implements RequestLogger
|
|||
try {
|
||||
synchronized (lock) {
|
||||
CloseQuietly.close(fileWriter);
|
||||
fileWriter = new FileWriter(new File(baseDir, currentDay.toString()), true);
|
||||
fileWriter = new OutputStreamWriter(
|
||||
new FileOutputStream(new File(baseDir, currentDay.toString()), true),
|
||||
Charsets.UTF_8
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -145,7 +145,7 @@ public class SQLRunner
|
|||
final URLConnection urlConnection = url.openConnection();
|
||||
urlConnection.addRequestProperty("content-type", "application/json");
|
||||
urlConnection.getOutputStream().write(queryStr.getBytes(Charsets.UTF_8));
|
||||
BufferedReader stdInput = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
|
||||
BufferedReader stdInput = new BufferedReader(new InputStreamReader(urlConnection.getInputStream(), Charsets.UTF_8));
|
||||
|
||||
Object res = objectMapper.readValue(stdInput, typeRef);
|
||||
|
||||
|
|
|
@ -176,6 +176,7 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas
|
|||
private volatile CountDownLatch deadContainerLatch = null;
|
||||
private volatile CountDownLatch newInventoryLatch = null;
|
||||
private volatile CountDownLatch deadInventoryLatch = null;
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> deserializeContainer(byte[] bytes)
|
||||
|
@ -271,5 +272,11 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas
|
|||
{
|
||||
this.deadInventoryLatch = deadInventoryLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inventoryInitialized()
|
||||
{
|
||||
initialized = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
|||
import io.druid.curator.announcement.Announcer;
|
||||
import io.druid.db.DatabaseSegmentManager;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.segment.realtime.DbSegmentPublisher;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
|
||||
|
@ -156,6 +157,7 @@ public class DruidClusterBridgeTest
|
|||
BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
|
||||
jsonMapper,
|
||||
zkPathsConfig,
|
||||
new SegmentLoaderConfig(),
|
||||
metadata,
|
||||
remoteCf,
|
||||
dbSegmentPublisher,
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.server.coordination;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
|
@ -50,18 +51,22 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ZkCoordinatorTest extends CuratorTestBase
|
||||
{
|
||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||
public static final int COUNT = 50;
|
||||
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
private ZkCoordinator zkCoordinator;
|
||||
private ServerManager serverManager;
|
||||
private DataSegmentAnnouncer announcer;
|
||||
private File infoDir;
|
||||
private AtomicInteger announceCount;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
|
@ -101,9 +106,41 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
}
|
||||
};
|
||||
|
||||
announcer = new SingleDataSegmentAnnouncer(
|
||||
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
|
||||
);
|
||||
announceCount = new AtomicInteger(0);
|
||||
announcer = new DataSegmentAnnouncer()
|
||||
{
|
||||
private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer(
|
||||
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
|
||||
);
|
||||
|
||||
@Override
|
||||
public void announceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
announceCount.incrementAndGet();
|
||||
delegate.announceSegment(segment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
announceCount.decrementAndGet();
|
||||
delegate.unannounceSegment(segment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
announceCount.addAndGet(Iterables.size(segments));
|
||||
delegate.announceSegments(segments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
announceCount.addAndGet(-Iterables.size(segments));
|
||||
delegate.unannounceSegments(segments);
|
||||
}
|
||||
};
|
||||
|
||||
zkCoordinator = new ZkCoordinator(
|
||||
jsonMapper,
|
||||
|
@ -114,6 +151,18 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
{
|
||||
return infoDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumLoadingThreads()
|
||||
{
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getAnnounceIntervalMillis()
|
||||
{
|
||||
return 50;
|
||||
}
|
||||
},
|
||||
zkPaths,
|
||||
me,
|
||||
|
@ -133,21 +182,22 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
@Test
|
||||
public void testLoadCache() throws Exception
|
||||
{
|
||||
List<DataSegment> segments = Lists.newArrayList(
|
||||
makeSegment("test", "1", new Interval("P1d/2011-04-01")),
|
||||
makeSegment("test", "1", new Interval("P1d/2011-04-02")),
|
||||
makeSegment("test", "2", new Interval("P1d/2011-04-02")),
|
||||
makeSegment("test", "1", new Interval("P1d/2011-04-03")),
|
||||
makeSegment("test", "1", new Interval("P1d/2011-04-04")),
|
||||
makeSegment("test", "1", new Interval("P1d/2011-04-05")),
|
||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T01")),
|
||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T02")),
|
||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T03")),
|
||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T05")),
|
||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T06")),
|
||||
makeSegment("test2", "1", new Interval("P1d/2011-04-01")),
|
||||
makeSegment("test2", "1", new Interval("P1d/2011-04-02"))
|
||||
);
|
||||
List<DataSegment> segments = Lists.newLinkedList();
|
||||
for(int i = 0; i < COUNT; ++i) {
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-03")));
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-04")));
|
||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-05")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T01")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T02")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T03")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T05")));
|
||||
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T06")));
|
||||
segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-01")));
|
||||
segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-02")));
|
||||
}
|
||||
Collections.sort(segments);
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
|
@ -158,6 +208,11 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
|
||||
zkCoordinator.start();
|
||||
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
|
||||
for(int i = 0; i < COUNT; ++i) {
|
||||
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
|
||||
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
|
||||
}
|
||||
Assert.assertEquals(13 * COUNT, announceCount.get());
|
||||
zkCoordinator.stop();
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
|
|
|
@ -35,13 +35,11 @@ import io.druid.client.cache.CacheProvider;
|
|||
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
|
||||
import io.druid.client.selector.ServerSelectorStrategy;
|
||||
import io.druid.client.selector.TierSelectorStrategy;
|
||||
import io.druid.curator.discovery.DiscoveryModule;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.query.MapQueryToolChestWarehouse;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
|
@ -49,6 +47,7 @@ import io.druid.query.RetryQueryRunnerConfig;
|
|||
import io.druid.server.ClientInfoResource;
|
||||
import io.druid.server.ClientQuerySegmentWalker;
|
||||
import io.druid.server.QueryResource;
|
||||
import io.druid.server.coordination.broker.DruidBroker;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import io.druid.server.metrics.MetricsModule;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
@ -101,8 +100,8 @@ public class CliBroker extends ServerRunnable
|
|||
Jerseys.addResource(binder, QueryResource.class);
|
||||
Jerseys.addResource(binder, ClientInfoResource.class);
|
||||
LifecycleModule.register(binder, QueryResource.class);
|
||||
LifecycleModule.register(binder, DruidBroker.class);
|
||||
|
||||
DiscoveryModule.register(binder, Self.class);
|
||||
MetricsModule.register(binder, CacheMonitor.class);
|
||||
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
|
|
Loading…
Reference in New Issue