mirror of https://github.com/apache/druid.git
Merge branch 'druid-0.7.x' into new-init
Conflicts: examples/config/historical/runtime.properties examples/config/overlord/runtime.properties examples/config/realtime/runtime.properties
This commit is contained in:
commit
2ecd55bae5
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.139
|
||||
git checkout druid-0.6.143
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
|
|||
|
||||
- Update realtime node's configs for Kafka 8 extensions
|
||||
- e.g.
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.143",...]`
|
||||
- becomes
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.143",...]`
|
||||
- Update realtime task config for changed keys
|
||||
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/overlord
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/middlemanager
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/historical
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.139
|
||||
cd druid-services-0.6.143
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -91,7 +91,7 @@ druid.service=overlord
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
|
||||
|
||||
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.db.connector.user=druid
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.139
|
||||
cd druid-services-0.6.143
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
# Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz).
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz).
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -102,6 +102,7 @@ public abstract class BaseQuery<T> implements Query<T>
|
|||
return duration;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
|
|
|
@ -70,6 +70,8 @@ public interface Query<T>
|
|||
|
||||
public Duration getDuration();
|
||||
|
||||
public Map<String, Object> getContext();
|
||||
|
||||
public <ContextType> ContextType getContextValue(String key);
|
||||
|
||||
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -36,8 +38,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||
{
|
||||
public CompressedLongsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.segment.indexing.granularity;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.PeekingIterator;
|
||||
|
@ -38,13 +39,16 @@ import java.util.TreeSet;
|
|||
public class ArbitraryGranularitySpec implements GranularitySpec
|
||||
{
|
||||
private final TreeSet<Interval> intervals;
|
||||
private final QueryGranularity queryGranularity;
|
||||
|
||||
@JsonCreator
|
||||
public ArbitraryGranularitySpec(
|
||||
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
|
||||
@JsonProperty("intervals") List<Interval> inputIntervals
|
||||
)
|
||||
{
|
||||
intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
|
||||
this.queryGranularity = queryGranularity;
|
||||
this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
|
||||
|
||||
// Insert all intervals
|
||||
for (final Interval inputInterval : inputIntervals) {
|
||||
|
@ -98,14 +102,18 @@ public class ArbitraryGranularitySpec implements GranularitySpec
|
|||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty("queryGranularity")
|
||||
public QueryGranularity getQueryGranularity()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
return queryGranularity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
return new ArbitraryGranularitySpec(
|
||||
queryGranularity,
|
||||
Lists.newArrayList(intervals)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class UniformGranularitySpec implements GranularitySpec
|
|||
Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
|
||||
}
|
||||
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
|
||||
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
|
||||
this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, granularIntervals);
|
||||
} else {
|
||||
this.inputIntervals = null;
|
||||
this.wrappedSpec = null;
|
||||
|
|
|
@ -259,20 +259,26 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||
public void onComplete(Result result)
|
||||
{
|
||||
final long requestTime = System.currentTimeMillis() - start;
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
.setUser3(String.valueOf(query.getContextPriority(0)))
|
||||
.setUser4(query.getType())
|
||||
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(query.getId())
|
||||
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
try {
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
.setUser3(
|
||||
jsonMapper.writeValueAsString(
|
||||
query.getContext() == null
|
||||
? ImmutableMap.of()
|
||||
: query.getContext()
|
||||
)
|
||||
)
|
||||
.setUser4(query.getType())
|
||||
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(query.getId())
|
||||
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
|
|
|
@ -178,7 +178,13 @@ public class QueryResource
|
|||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
.setUser3(String.valueOf(query.getContextPriority(0)))
|
||||
.setUser3(
|
||||
jsonMapper.writeValueAsString(
|
||||
query.getContext() == null
|
||||
? ImmutableMap.of()
|
||||
: query.getContext()
|
||||
)
|
||||
)
|
||||
.setUser4(query.getType())
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
|
|
|
@ -21,8 +21,6 @@ package io.druid.server.coordinator;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
|
@ -44,7 +42,7 @@ import java.util.Collection;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -59,16 +57,16 @@ public class LoadQueuePeon
|
|||
private static final int DROP = 0;
|
||||
private static final int LOAD = 1;
|
||||
|
||||
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>()
|
||||
{
|
||||
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
|
||||
private static Comparator<DataSegment> segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator());
|
||||
|
||||
@Override
|
||||
public int compare(SegmentHolder lhs, SegmentHolder rhs)
|
||||
{
|
||||
return comparator.compare(lhs.getSegment(), rhs.getSegment());
|
||||
private static void executeCallbacks(List<LoadPeonCallback> callbacks)
|
||||
{
|
||||
for (LoadPeonCallback callback : callbacks) {
|
||||
if (callback != null) {
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private final CuratorFramework curator;
|
||||
private final String basePath;
|
||||
|
@ -80,11 +78,11 @@ public class LoadQueuePeon
|
|||
private final AtomicLong queuedSize = new AtomicLong(0);
|
||||
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
|
||||
|
||||
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>(
|
||||
segmentHolderComparator
|
||||
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
|
||||
segmentComparator
|
||||
);
|
||||
private final ConcurrentSkipListSet<SegmentHolder> segmentsToDrop = new ConcurrentSkipListSet<SegmentHolder>(
|
||||
segmentHolderComparator
|
||||
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
|
||||
segmentComparator
|
||||
);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
@ -111,37 +109,13 @@ public class LoadQueuePeon
|
|||
@JsonProperty
|
||||
public Set<DataSegment> getSegmentsToLoad()
|
||||
{
|
||||
return new ConcurrentSkipListSet<DataSegment>(
|
||||
Collections2.transform(
|
||||
segmentsToLoad,
|
||||
new Function<SegmentHolder, DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public DataSegment apply(SegmentHolder input)
|
||||
{
|
||||
return input.getSegment();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
return segmentsToLoad.keySet();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Set<DataSegment> getSegmentsToDrop()
|
||||
{
|
||||
return new ConcurrentSkipListSet<DataSegment>(
|
||||
Collections2.transform(
|
||||
segmentsToDrop,
|
||||
new Function<SegmentHolder, DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public DataSegment apply(SegmentHolder input)
|
||||
{
|
||||
return input.getSegment();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
return segmentsToDrop.keySet();
|
||||
}
|
||||
|
||||
public long getLoadQueueSize()
|
||||
|
@ -155,8 +129,8 @@ public class LoadQueuePeon
|
|||
}
|
||||
|
||||
public void loadSegment(
|
||||
DataSegment segment,
|
||||
LoadPeonCallback callback
|
||||
final DataSegment segment,
|
||||
final LoadPeonCallback callback
|
||||
)
|
||||
{
|
||||
synchronized (lock) {
|
||||
|
@ -169,12 +143,11 @@ public class LoadQueuePeon
|
|||
}
|
||||
}
|
||||
|
||||
SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback));
|
||||
|
||||
synchronized (lock) {
|
||||
if (segmentsToLoad.contains(holder)) {
|
||||
final SegmentHolder existingHolder = segmentsToLoad.get(segment);
|
||||
if (existingHolder != null) {
|
||||
if ((callback != null)) {
|
||||
currentlyProcessing.addCallback(callback);
|
||||
existingHolder.addCallback(callback);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -182,13 +155,13 @@ public class LoadQueuePeon
|
|||
|
||||
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
|
||||
queuedSize.addAndGet(segment.getSize());
|
||||
segmentsToLoad.add(holder);
|
||||
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback)));
|
||||
doNext();
|
||||
}
|
||||
|
||||
public void dropSegment(
|
||||
DataSegment segment,
|
||||
LoadPeonCallback callback
|
||||
final DataSegment segment,
|
||||
final LoadPeonCallback callback
|
||||
)
|
||||
{
|
||||
synchronized (lock) {
|
||||
|
@ -201,19 +174,18 @@ public class LoadQueuePeon
|
|||
}
|
||||
}
|
||||
|
||||
SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback));
|
||||
|
||||
synchronized (lock) {
|
||||
if (segmentsToDrop.contains(holder)) {
|
||||
final SegmentHolder existingHolder = segmentsToDrop.get(segment);
|
||||
if (existingHolder != null) {
|
||||
if (callback != null) {
|
||||
currentlyProcessing.addCallback(callback);
|
||||
existingHolder.addCallback(callback);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
|
||||
segmentsToDrop.add(holder);
|
||||
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback)));
|
||||
doNext();
|
||||
}
|
||||
|
||||
|
@ -222,10 +194,10 @@ public class LoadQueuePeon
|
|||
synchronized (lock) {
|
||||
if (currentlyProcessing == null) {
|
||||
if (!segmentsToDrop.isEmpty()) {
|
||||
currentlyProcessing = segmentsToDrop.first();
|
||||
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
|
||||
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
} else if (!segmentsToLoad.isEmpty()) {
|
||||
currentlyProcessing = segmentsToLoad.first();
|
||||
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
|
||||
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
} else {
|
||||
return;
|
||||
|
@ -328,24 +300,25 @@ public class LoadQueuePeon
|
|||
if (currentlyProcessing != null) {
|
||||
switch (currentlyProcessing.getType()) {
|
||||
case LOAD:
|
||||
segmentsToLoad.remove(currentlyProcessing);
|
||||
segmentsToLoad.remove(currentlyProcessing.getSegment());
|
||||
queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
|
||||
break;
|
||||
case DROP:
|
||||
segmentsToDrop.remove(currentlyProcessing);
|
||||
segmentsToDrop.remove(currentlyProcessing.getSegment());
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
||||
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
|
||||
currentlyProcessing = null;
|
||||
callBackExecutor.execute(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
currentlyProcessing.executeCallbacks();
|
||||
currentlyProcessing = null;
|
||||
executeCallbacks(callbacks);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -356,20 +329,20 @@ public class LoadQueuePeon
|
|||
{
|
||||
synchronized (lock) {
|
||||
if (currentlyProcessing != null) {
|
||||
currentlyProcessing.executeCallbacks();
|
||||
executeCallbacks(currentlyProcessing.getCallbacks());
|
||||
currentlyProcessing = null;
|
||||
}
|
||||
|
||||
if (!segmentsToDrop.isEmpty()) {
|
||||
for (SegmentHolder holder : segmentsToDrop) {
|
||||
holder.executeCallbacks();
|
||||
for (SegmentHolder holder : segmentsToDrop.values()) {
|
||||
executeCallbacks(holder.getCallbacks());
|
||||
}
|
||||
}
|
||||
segmentsToDrop.clear();
|
||||
|
||||
if (!segmentsToLoad.isEmpty()) {
|
||||
for (SegmentHolder holder : segmentsToLoad) {
|
||||
holder.executeCallbacks();
|
||||
for (SegmentHolder holder : segmentsToLoad.values()) {
|
||||
executeCallbacks(holder.getCallbacks());
|
||||
}
|
||||
}
|
||||
segmentsToLoad.clear();
|
||||
|
@ -466,15 +439,10 @@ public class LoadQueuePeon
|
|||
}
|
||||
}
|
||||
|
||||
public void executeCallbacks()
|
||||
public List<LoadPeonCallback> getCallbacks()
|
||||
{
|
||||
synchronized (callbacks) {
|
||||
for (LoadPeonCallback callback : callbacks) {
|
||||
if (callback != null) {
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
callbacks.clear();
|
||||
return callbacks;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -38,7 +39,9 @@ public class ArbitraryGranularityTest
|
|||
@Test
|
||||
public void testSimple()
|
||||
{
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(Lists.newArrayList(
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(
|
||||
QueryGranularity.NONE,
|
||||
Lists.newArrayList(
|
||||
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
|
||||
new Interval("2012-02-01T00Z/2012-03-01T00Z"),
|
||||
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
|
||||
|
@ -111,7 +114,7 @@ public class ArbitraryGranularityTest
|
|||
|
||||
boolean thrown = false;
|
||||
try {
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(intervals);
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, intervals);
|
||||
} catch(IllegalArgumentException e) {
|
||||
thrown = true;
|
||||
}
|
||||
|
@ -129,7 +132,7 @@ public class ArbitraryGranularityTest
|
|||
|
||||
boolean thrown = false;
|
||||
try {
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(intervals);
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, intervals);
|
||||
} catch(IllegalArgumentException e) {
|
||||
thrown = true;
|
||||
}
|
||||
|
@ -140,7 +143,7 @@ public class ArbitraryGranularityTest
|
|||
@Test
|
||||
public void testJson()
|
||||
{
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(Lists.newArrayList(
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, Lists.newArrayList(
|
||||
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
|
||||
new Interval("2012-02-01T00Z/2012-03-01T00Z"),
|
||||
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
|
||||
|
|
Loading…
Reference in New Issue