diff --git a/docs/content/Examples.md b/docs/content/Examples.md index 2d0759abc57..a0e27d6248c 100644 --- a/docs/content/Examples.md +++ b/docs/content/Examples.md @@ -19,13 +19,13 @@ Clone Druid and build it: git clone https://github.com/metamx/druid.git druid cd druid git fetch --tags -git checkout druid-0.6.139 +git checkout druid-0.6.143 ./build.sh ``` ### Downloading the DSK (Druid Standalone Kit) -[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it: +[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz) a stand-alone tarball and run it: ``` bash tar -xzf druid-services-0.X.X-bin.tar.gz diff --git a/docs/content/Kafka-Eight.md b/docs/content/Kafka-Eight.md index 240bf31fefe..8ce8213af19 100644 --- a/docs/content/Kafka-Eight.md +++ b/docs/content/Kafka-Eight.md @@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need - Update realtime node's configs for Kafka 8 extensions - e.g. - - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]` + - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.143",...]` - becomes - - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]` + - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.143",...]` - Update realtime task config for changed keys - `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes. diff --git a/docs/content/Production-Cluster-Configuration.md b/docs/content/Production-Cluster-Configuration.md index 2b2328b2d95..743a0b17f3c 100644 --- a/docs/content/Production-Cluster-Configuration.md +++ b/docs/content/Production-Cluster-Configuration.md @@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/overlord -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod @@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/middlemanager -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod @@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/historical -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod diff --git a/docs/content/Realtime-Config.md b/docs/content/Realtime-Config.md index 80d67c3ffad..06758bd0696 100644 --- a/docs/content/Realtime-Config.md +++ b/docs/content/Realtime-Config.md @@ -27,7 +27,7 @@ druid.host=localhost druid.service=realtime druid.port=8083 -druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"] druid.zk.service.host=localhost @@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/realtime -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod diff --git a/docs/content/Simple-Cluster-Configuration.md b/docs/content/Simple-Cluster-Configuration.md index 820fb0e9d6b..f5955b4f6da 100644 --- a/docs/content/Simple-Cluster-Configuration.md +++ b/docs/content/Simple-Cluster-Configuration.md @@ -28,7 +28,7 @@ Configuration: -Ddruid.zk.service.host=localhost --Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] +-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"] -Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid -Ddruid.db.connector.user=druid diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md index a60942f8544..acfbdd82236 100644 --- a/docs/content/Tutorial:-A-First-Look-at-Druid.md +++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md @@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu ### Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing. +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz). Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.139 +cd druid-services-0.6.143 ``` You should see a bunch of files: diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md index 9ef08aedda3..3a638ecf9e3 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md @@ -91,7 +91,7 @@ druid.service=overlord druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"] druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid druid.db.connector.user=druid diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md index 095c175a719..23713218db2 100644 --- a/docs/content/Tutorial:-The-Druid-Cluster.md +++ b/docs/content/Tutorial:-The-Druid-Cluster.md @@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. -You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) +You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz) and untar the contents within by issuing: @@ -149,7 +149,7 @@ druid.port=8081 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"] # Dummy read only AWS account (used to download example data) druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b @@ -240,7 +240,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"] # Change this config to db to hand off to the rest of the Druid cluster druid.publish.type=noop diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md index 2a23fde861f..af8d16524f0 100644 --- a/docs/content/Tutorial:-Webstream.md +++ b/docs/content/Tutorial:-Webstream.md @@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu h3. Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz) Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.139 +cd druid-services-0.6.143 ``` You should see a bunch of files: diff --git a/docs/content/Twitter-Tutorial.md b/docs/content/Twitter-Tutorial.md index 9f8c80ad8ad..04588de4c40 100644 --- a/docs/content/Twitter-Tutorial.md +++ b/docs/content/Twitter-Tutorial.md @@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source. # Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz). Download this bad boy to a directory of your choosing. You can extract the awesomeness within by issuing: diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 32d9c3256f4..e2052f3ba64 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -102,6 +102,7 @@ public abstract class BaseQuery implements Query return duration; } + @Override @JsonProperty public Map getContext() { diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 04c581152ad..8c34be7ec28 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -70,6 +70,8 @@ public interface Query public Duration getDuration(); + public Map getContext(); + public ContextType getContextValue(String key); public ContextType getContextValue(String key, ContextType defaultValue); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java index fc29e284443..b251134ddc7 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java @@ -25,6 +25,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -36,8 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -/** - */ +@RunWith(Parameterized.class) public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest { public CompressedLongsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index dd1a5b4bbde..807764e515c 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -21,6 +21,7 @@ package io.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.util.Lists; import com.google.common.base.Optional; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -38,13 +39,16 @@ import java.util.TreeSet; public class ArbitraryGranularitySpec implements GranularitySpec { private final TreeSet intervals; + private final QueryGranularity queryGranularity; @JsonCreator public ArbitraryGranularitySpec( + @JsonProperty("queryGranularity") QueryGranularity queryGranularity, @JsonProperty("intervals") List inputIntervals ) { - intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); + this.queryGranularity = queryGranularity; + this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); // Insert all intervals for (final Interval inputInterval : inputIntervals) { @@ -98,14 +102,18 @@ public class ArbitraryGranularitySpec implements GranularitySpec } @Override + @JsonProperty("queryGranularity") public QueryGranularity getQueryGranularity() { - throw new UnsupportedOperationException(); + return queryGranularity; } @Override public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity) { - throw new UnsupportedOperationException(); + return new ArbitraryGranularitySpec( + queryGranularity, + Lists.newArrayList(intervals) + ); } } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index d494eb2ee47..d83f824366e 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -68,7 +68,7 @@ public class UniformGranularitySpec implements GranularitySpec Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval)); } this.inputIntervals = ImmutableList.copyOf(inputIntervals); - this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals); + this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, granularIntervals); } else { this.inputIntervals = null; this.wrappedSpec = null; diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 91f90d37f91..624d52e1b36 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -259,20 +259,26 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet public void onComplete(Result result) { final long requestTime = System.currentTimeMillis() - start; - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser3(String.valueOf(query.getContextPriority(0))) - .setUser4(query.getType()) - .setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals())) - .setUser6(String.valueOf(query.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(query.getId()) - .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - try { + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser3( + jsonMapper.writeValueAsString( + query.getContext() == null + ? ImmutableMap.of() + : query.getContext() + ) + ) + .setUser4(query.getType()) + .setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(query.getId()) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + requestLogger.log( new RequestLogLine( new DateTime(), diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 18ba190393f..7cef7893280 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -178,7 +178,13 @@ public class QueryResource emitter.emit( new ServiceMetricEvent.Builder() .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser3(String.valueOf(query.getContextPriority(0))) + .setUser3( + jsonMapper.writeValueAsString( + query.getContext() == null + ? ImmutableMap.of() + : query.getContext() + ) + ) .setUser4(query.getType()) .setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser6(String.valueOf(query.hasFilters())) diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index da0fd0066a3..7a591862d11 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -21,8 +21,6 @@ package io.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; @@ -44,7 +42,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -59,16 +57,16 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private static Comparator segmentHolderComparator = new Comparator() - { - private Comparator comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); + private static Comparator segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator()); - @Override - public int compare(SegmentHolder lhs, SegmentHolder rhs) - { - return comparator.compare(lhs.getSegment(), rhs.getSegment()); + private static void executeCallbacks(List callbacks) + { + for (LoadPeonCallback callback : callbacks) { + if (callback != null) { + callback.execute(); + } } - }; + } private final CuratorFramework curator; private final String basePath; @@ -80,11 +78,11 @@ public class LoadQueuePeon private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0); - private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet( - segmentHolderComparator + private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( + segmentComparator ); - private final ConcurrentSkipListSet segmentsToDrop = new ConcurrentSkipListSet( - segmentHolderComparator + private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( + segmentComparator ); private final Object lock = new Object(); @@ -111,37 +109,13 @@ public class LoadQueuePeon @JsonProperty public Set getSegmentsToLoad() { - return new ConcurrentSkipListSet( - Collections2.transform( - segmentsToLoad, - new Function() - { - @Override - public DataSegment apply(SegmentHolder input) - { - return input.getSegment(); - } - } - ) - ); + return segmentsToLoad.keySet(); } @JsonProperty public Set getSegmentsToDrop() { - return new ConcurrentSkipListSet( - Collections2.transform( - segmentsToDrop, - new Function() - { - @Override - public DataSegment apply(SegmentHolder input) - { - return input.getSegment(); - } - } - ) - ); + return segmentsToDrop.keySet(); } public long getLoadQueueSize() @@ -155,8 +129,8 @@ public class LoadQueuePeon } public void loadSegment( - DataSegment segment, - LoadPeonCallback callback + final DataSegment segment, + final LoadPeonCallback callback ) { synchronized (lock) { @@ -169,12 +143,11 @@ public class LoadQueuePeon } } - SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback)); - synchronized (lock) { - if (segmentsToLoad.contains(holder)) { + final SegmentHolder existingHolder = segmentsToLoad.get(segment); + if (existingHolder != null) { if ((callback != null)) { - currentlyProcessing.addCallback(callback); + existingHolder.addCallback(callback); } return; } @@ -182,13 +155,13 @@ public class LoadQueuePeon log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); queuedSize.addAndGet(segment.getSize()); - segmentsToLoad.add(holder); + segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback))); doNext(); } public void dropSegment( - DataSegment segment, - LoadPeonCallback callback + final DataSegment segment, + final LoadPeonCallback callback ) { synchronized (lock) { @@ -201,19 +174,18 @@ public class LoadQueuePeon } } - SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback)); - synchronized (lock) { - if (segmentsToDrop.contains(holder)) { + final SegmentHolder existingHolder = segmentsToDrop.get(segment); + if (existingHolder != null) { if (callback != null) { - currentlyProcessing.addCallback(callback); + existingHolder.addCallback(callback); } return; } } log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); - segmentsToDrop.add(holder); + segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback))); doNext(); } @@ -222,10 +194,10 @@ public class LoadQueuePeon synchronized (lock) { if (currentlyProcessing == null) { if (!segmentsToDrop.isEmpty()) { - currentlyProcessing = segmentsToDrop.first(); + currentlyProcessing = segmentsToDrop.firstEntry().getValue(); log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); } else if (!segmentsToLoad.isEmpty()) { - currentlyProcessing = segmentsToLoad.first(); + currentlyProcessing = segmentsToLoad.firstEntry().getValue(); log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); } else { return; @@ -328,24 +300,25 @@ public class LoadQueuePeon if (currentlyProcessing != null) { switch (currentlyProcessing.getType()) { case LOAD: - segmentsToLoad.remove(currentlyProcessing); + segmentsToLoad.remove(currentlyProcessing.getSegment()); queuedSize.addAndGet(-currentlyProcessing.getSegmentSize()); break; case DROP: - segmentsToDrop.remove(currentlyProcessing); + segmentsToDrop.remove(currentlyProcessing.getSegment()); break; default: throw new UnsupportedOperationException(); } - + + final List callbacks = currentlyProcessing.getCallbacks(); + currentlyProcessing = null; callBackExecutor.execute( new Runnable() { @Override public void run() { - currentlyProcessing.executeCallbacks(); - currentlyProcessing = null; + executeCallbacks(callbacks); } } ); @@ -356,20 +329,20 @@ public class LoadQueuePeon { synchronized (lock) { if (currentlyProcessing != null) { - currentlyProcessing.executeCallbacks(); + executeCallbacks(currentlyProcessing.getCallbacks()); currentlyProcessing = null; } if (!segmentsToDrop.isEmpty()) { - for (SegmentHolder holder : segmentsToDrop) { - holder.executeCallbacks(); + for (SegmentHolder holder : segmentsToDrop.values()) { + executeCallbacks(holder.getCallbacks()); } } segmentsToDrop.clear(); if (!segmentsToLoad.isEmpty()) { - for (SegmentHolder holder : segmentsToLoad) { - holder.executeCallbacks(); + for (SegmentHolder holder : segmentsToLoad.values()) { + executeCallbacks(holder.getCallbacks()); } } segmentsToLoad.clear(); @@ -466,15 +439,10 @@ public class LoadQueuePeon } } - public void executeCallbacks() + public List getCallbacks() { synchronized (callbacks) { - for (LoadPeonCallback callback : callbacks) { - if (callback != null) { - callback.execute(); - } - } - callbacks.clear(); + return callbacks; } } diff --git a/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java index 910bdfe0df8..ad8f4c721db 100644 --- a/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java +++ b/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -38,7 +39,9 @@ public class ArbitraryGranularityTest @Test public void testSimple() { - final GranularitySpec spec = new ArbitraryGranularitySpec(Lists.newArrayList( + final GranularitySpec spec = new ArbitraryGranularitySpec( + QueryGranularity.NONE, + Lists.newArrayList( new Interval("2012-01-08T00Z/2012-01-11T00Z"), new Interval("2012-02-01T00Z/2012-03-01T00Z"), new Interval("2012-01-07T00Z/2012-01-08T00Z"), @@ -111,7 +114,7 @@ public class ArbitraryGranularityTest boolean thrown = false; try { - final GranularitySpec spec = new ArbitraryGranularitySpec(intervals); + final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, intervals); } catch(IllegalArgumentException e) { thrown = true; } @@ -129,7 +132,7 @@ public class ArbitraryGranularityTest boolean thrown = false; try { - final GranularitySpec spec = new ArbitraryGranularitySpec(intervals); + final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, intervals); } catch(IllegalArgumentException e) { thrown = true; } @@ -140,7 +143,7 @@ public class ArbitraryGranularityTest @Test public void testJson() { - final GranularitySpec spec = new ArbitraryGranularitySpec(Lists.newArrayList( + final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularity.NONE, Lists.newArrayList( new Interval("2012-01-08T00Z/2012-01-11T00Z"), new Interval("2012-02-01T00Z/2012-03-01T00Z"), new Interval("2012-01-07T00Z/2012-01-08T00Z"),