Merge branch 'master' into druid-0.7.x

Conflicts:
	cassandra-storage/pom.xml
	common/pom.xml
	examples/pom.xml
	hdfs-storage/pom.xml
	histogram/pom.xml
	indexing-hadoop/pom.xml
	indexing-service/pom.xml
	kafka-eight/pom.xml
	kafka-seven/pom.xml
	pom.xml
	processing/pom.xml
	rabbitmq/pom.xml
	s3-extensions/pom.xml
	server/pom.xml
	services/pom.xml
This commit is contained in:
fjy 2014-08-11 12:35:47 -07:00
commit 70b86c5551
21 changed files with 112 additions and 118 deletions

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid git clone https://github.com/metamx/druid.git druid
cd druid cd druid
git fetch --tags git fetch --tags
git checkout druid-0.6.139 git checkout druid-0.6.143
./build.sh ./build.sh
``` ```
### Downloading the DSK (Druid Standalone Kit) ### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it: [Download](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz) a stand-alone tarball and run it:
``` bash ``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- Update realtime node's configs for Kafka 8 extensions - Update realtime node's configs for Kafka 8 extensions
- e.g. - e.g.
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]` - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.143",...]`
- becomes - becomes
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]` - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.143",...]`
- Update realtime task config for changed keys - Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes. - `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.

View File

@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/overlord druid.service=druid/prod/overlord
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/middlemanager druid.service=druid/prod/middlemanager
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/historical druid.service=druid/prod/historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime druid.service=realtime
druid.port=8083 druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.zk.service.host=localhost druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/realtime druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost -Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] -Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid -Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid -Ddruid.db.connector.user=druid

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball ### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing. We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.139 cd druid-services-0.6.143
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid druid.db.connector.user=druid

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz)
and untar the contents within by issuing: and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz)
Download this file to a directory of your choosing. Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.139 cd druid-services-0.6.143
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball # Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz).
Download this bad boy to a directory of your choosing. Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.service=overlord
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid druid.db.connector.user=druid

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139","io.druid.extensions:druid-rabbitmq:0.6.139"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143","io.druid.extensions:druid-rabbitmq:0.6.143"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -102,6 +102,7 @@ public abstract class BaseQuery<T> implements Query<T>
return duration; return duration;
} }
@Override
@JsonProperty @JsonProperty
public Map<String, Object> getContext() public Map<String, Object> getContext()
{ {

View File

@ -70,6 +70,8 @@ public interface Query<T>
public Duration getDuration(); public Duration getDuration();
public Map<String, Object> getContext();
public <ContextType> ContextType getContextValue(String key); public <ContextType> ContextType getContextValue(String key);
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue); public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);

View File

@ -21,6 +21,7 @@ package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Lists;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator; import com.google.common.collect.PeekingIterator;
@ -38,13 +39,16 @@ import java.util.TreeSet;
public class ArbitraryGranularitySpec implements GranularitySpec public class ArbitraryGranularitySpec implements GranularitySpec
{ {
private final TreeSet<Interval> intervals; private final TreeSet<Interval> intervals;
private final QueryGranularity queryGranularity;
@JsonCreator @JsonCreator
public ArbitraryGranularitySpec( public ArbitraryGranularitySpec(
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("intervals") List<Interval> inputIntervals @JsonProperty("intervals") List<Interval> inputIntervals
) )
{ {
intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); this.queryGranularity = queryGranularity;
this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
// Insert all intervals // Insert all intervals
for (final Interval inputInterval : inputIntervals) { for (final Interval inputInterval : inputIntervals) {
@ -98,14 +102,18 @@ public class ArbitraryGranularitySpec implements GranularitySpec
} }
@Override @Override
@JsonProperty("queryGranularity")
public QueryGranularity getQueryGranularity() public QueryGranularity getQueryGranularity()
{ {
throw new UnsupportedOperationException(); return queryGranularity;
} }
@Override @Override
public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity) public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity)
{ {
throw new UnsupportedOperationException(); return new ArbitraryGranularitySpec(
queryGranularity,
Lists.newArrayList(intervals)
);
} }
} }

View File

@ -68,7 +68,7 @@ public class UniformGranularitySpec implements GranularitySpec
Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval)); Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
} }
this.inputIntervals = ImmutableList.copyOf(inputIntervals); this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals); this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, granularIntervals);
} else { } else {
this.inputIntervals = null; this.inputIntervals = null;
this.wrappedSpec = null; this.wrappedSpec = null;

View File

@ -259,20 +259,26 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
public void onComplete(Result result) public void onComplete(Result result)
{ {
final long requestTime = System.currentTimeMillis() - start; final long requestTime = System.currentTimeMillis() - start;
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.valueOf(query.getContextPriority(0)))
.setUser4(query.getType())
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
try { try {
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
)
.setUser4(query.getType())
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
requestLogger.log( requestLogger.log(
new RequestLogLine( new RequestLogLine(
new DateTime(), new DateTime(),

View File

@ -178,7 +178,13 @@ public class QueryResource
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource())) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.valueOf(query.getContextPriority(0))) .setUser3(
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
)
.setUser4(query.getType()) .setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))

View File

@ -21,8 +21,6 @@ package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
@ -44,7 +42,7 @@ import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -59,16 +57,16 @@ public class LoadQueuePeon
private static final int DROP = 0; private static final int DROP = 0;
private static final int LOAD = 1; private static final int LOAD = 1;
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>() private static Comparator<DataSegment> segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator());
{
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
@Override private static void executeCallbacks(List<LoadPeonCallback> callbacks)
public int compare(SegmentHolder lhs, SegmentHolder rhs) {
{ for (LoadPeonCallback callback : callbacks) {
return comparator.compare(lhs.getSegment(), rhs.getSegment()); if (callback != null) {
callback.execute();
}
} }
}; }
private final CuratorFramework curator; private final CuratorFramework curator;
private final String basePath; private final String basePath;
@ -80,11 +78,11 @@ public class LoadQueuePeon
private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>( private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentHolderComparator segmentComparator
); );
private final ConcurrentSkipListSet<SegmentHolder> segmentsToDrop = new ConcurrentSkipListSet<SegmentHolder>( private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentHolderComparator segmentComparator
); );
private final Object lock = new Object(); private final Object lock = new Object();
@ -111,37 +109,13 @@ public class LoadQueuePeon
@JsonProperty @JsonProperty
public Set<DataSegment> getSegmentsToLoad() public Set<DataSegment> getSegmentsToLoad()
{ {
return new ConcurrentSkipListSet<DataSegment>( return segmentsToLoad.keySet();
Collections2.transform(
segmentsToLoad,
new Function<SegmentHolder, DataSegment>()
{
@Override
public DataSegment apply(SegmentHolder input)
{
return input.getSegment();
}
}
)
);
} }
@JsonProperty @JsonProperty
public Set<DataSegment> getSegmentsToDrop() public Set<DataSegment> getSegmentsToDrop()
{ {
return new ConcurrentSkipListSet<DataSegment>( return segmentsToDrop.keySet();
Collections2.transform(
segmentsToDrop,
new Function<SegmentHolder, DataSegment>()
{
@Override
public DataSegment apply(SegmentHolder input)
{
return input.getSegment();
}
}
)
);
} }
public long getLoadQueueSize() public long getLoadQueueSize()
@ -155,8 +129,8 @@ public class LoadQueuePeon
} }
public void loadSegment( public void loadSegment(
DataSegment segment, final DataSegment segment,
LoadPeonCallback callback final LoadPeonCallback callback
) )
{ {
synchronized (lock) { synchronized (lock) {
@ -169,12 +143,11 @@ public class LoadQueuePeon
} }
} }
SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback));
synchronized (lock) { synchronized (lock) {
if (segmentsToLoad.contains(holder)) { final SegmentHolder existingHolder = segmentsToLoad.get(segment);
if (existingHolder != null) {
if ((callback != null)) { if ((callback != null)) {
currentlyProcessing.addCallback(callback); existingHolder.addCallback(callback);
} }
return; return;
} }
@ -182,13 +155,13 @@ public class LoadQueuePeon
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize()); queuedSize.addAndGet(segment.getSize());
segmentsToLoad.add(holder); segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback)));
doNext(); doNext();
} }
public void dropSegment( public void dropSegment(
DataSegment segment, final DataSegment segment,
LoadPeonCallback callback final LoadPeonCallback callback
) )
{ {
synchronized (lock) { synchronized (lock) {
@ -201,19 +174,18 @@ public class LoadQueuePeon
} }
} }
SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback));
synchronized (lock) { synchronized (lock) {
if (segmentsToDrop.contains(holder)) { final SegmentHolder existingHolder = segmentsToDrop.get(segment);
if (existingHolder != null) {
if (callback != null) { if (callback != null) {
currentlyProcessing.addCallback(callback); existingHolder.addCallback(callback);
} }
return; return;
} }
} }
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.add(holder); segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback)));
doNext(); doNext();
} }
@ -222,10 +194,10 @@ public class LoadQueuePeon
synchronized (lock) { synchronized (lock) {
if (currentlyProcessing == null) { if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) { if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.first(); currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) { } else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.first(); currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else { } else {
return; return;
@ -328,24 +300,25 @@ public class LoadQueuePeon
if (currentlyProcessing != null) { if (currentlyProcessing != null) {
switch (currentlyProcessing.getType()) { switch (currentlyProcessing.getType()) {
case LOAD: case LOAD:
segmentsToLoad.remove(currentlyProcessing); segmentsToLoad.remove(currentlyProcessing.getSegment());
queuedSize.addAndGet(-currentlyProcessing.getSegmentSize()); queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
break; break;
case DROP: case DROP:
segmentsToDrop.remove(currentlyProcessing); segmentsToDrop.remove(currentlyProcessing.getSegment());
break; break;
default: default:
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute( callBackExecutor.execute(
new Runnable() new Runnable()
{ {
@Override @Override
public void run() public void run()
{ {
currentlyProcessing.executeCallbacks(); executeCallbacks(callbacks);
currentlyProcessing = null;
} }
} }
); );
@ -356,20 +329,20 @@ public class LoadQueuePeon
{ {
synchronized (lock) { synchronized (lock) {
if (currentlyProcessing != null) { if (currentlyProcessing != null) {
currentlyProcessing.executeCallbacks(); executeCallbacks(currentlyProcessing.getCallbacks());
currentlyProcessing = null; currentlyProcessing = null;
} }
if (!segmentsToDrop.isEmpty()) { if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop) { for (SegmentHolder holder : segmentsToDrop.values()) {
holder.executeCallbacks(); executeCallbacks(holder.getCallbacks());
} }
} }
segmentsToDrop.clear(); segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) { if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad) { for (SegmentHolder holder : segmentsToLoad.values()) {
holder.executeCallbacks(); executeCallbacks(holder.getCallbacks());
} }
} }
segmentsToLoad.clear(); segmentsToLoad.clear();
@ -466,15 +439,10 @@ public class LoadQueuePeon
} }
} }
public void executeCallbacks() public List<LoadPeonCallback> getCallbacks()
{ {
synchronized (callbacks) { synchronized (callbacks) {
for (LoadPeonCallback callback : callbacks) { return callbacks;
if (callback != null) {
callback.execute();
}
}
callbacks.clear();
} }
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -38,7 +39,9 @@ public class ArbitraryGranularityTest
@Test @Test
public void testSimple() public void testSimple()
{ {
final GranularitySpec spec = new ArbitraryGranularitySpec(Lists.newArrayList( final GranularitySpec spec = new ArbitraryGranularitySpec(
QueryGranularity.NONE,
Lists.newArrayList(
new Interval("2012-01-08T00Z/2012-01-11T00Z"), new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-02-01T00Z/2012-03-01T00Z"), new Interval("2012-02-01T00Z/2012-03-01T00Z"),
new Interval("2012-01-07T00Z/2012-01-08T00Z"), new Interval("2012-01-07T00Z/2012-01-08T00Z"),
@ -111,7 +114,7 @@ public class ArbitraryGranularityTest
boolean thrown = false; boolean thrown = false;
try { try {
final GranularitySpec spec = new ArbitraryGranularitySpec(intervals); final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, intervals);
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
thrown = true; thrown = true;
} }
@ -129,7 +132,7 @@ public class ArbitraryGranularityTest
boolean thrown = false; boolean thrown = false;
try { try {
final GranularitySpec spec = new ArbitraryGranularitySpec(intervals); final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, intervals);
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
thrown = true; thrown = true;
} }
@ -140,7 +143,7 @@ public class ArbitraryGranularityTest
@Test @Test
public void testJson() public void testJson()
{ {
final GranularitySpec spec = new ArbitraryGranularitySpec(Lists.newArrayList( final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, Lists.newArrayList(
new Interval("2012-01-08T00Z/2012-01-11T00Z"), new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-02-01T00Z/2012-03-01T00Z"), new Interval("2012-02-01T00Z/2012-03-01T00Z"),
new Interval("2012-01-07T00Z/2012-01-08T00Z"), new Interval("2012-01-07T00Z/2012-01-08T00Z"),