diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 187f35a872f..fa0ee9f727a 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -162,9 +162,33 @@ public class CachingClusteredClient implements QueryRunner Set> segments = Sets.newLinkedHashSet(); List> serversLookup = Lists.newLinkedList(); + List uncoveredIntervals = Lists.newLinkedList(); for (Interval interval : query.getIntervals()) { - Iterables.addAll(serversLookup, timeline.lookup(interval)); + Iterable> lookup = timeline.lookup(interval); + long startMillis = interval.getStartMillis(); + long endMillis = interval.getEndMillis(); + for (TimelineObjectHolder holder : lookup) { + Interval holderInterval = holder.getInterval(); + long intervalStart = holderInterval.getStartMillis(); + if (startMillis != intervalStart) { + uncoveredIntervals.add(new Interval(startMillis, intervalStart)); + } + startMillis = holderInterval.getEndMillis(); + serversLookup.add(holder); + } + + if (startMillis < endMillis) { + uncoveredIntervals.add(new Interval(startMillis, endMillis)); + } + } + + if (!uncoveredIntervals.isEmpty()) { + // This returns intervals for which NO segment is present. + // Which is not necessarily an indication that the data doesn't exist or is + // incomplete. The data could exist and just not be loaded yet. In either + // case, though, this query will not include any data from the identified intervals. + responseContext.put("uncoveredIntervals", uncoveredIntervals); } // Let tool chest filter out unneeded segments diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java new file mode 100644 index 00000000000..7688514f3e5 --- /dev/null +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.client; + +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; +import io.druid.client.selector.QueryableDruidServer; +import io.druid.client.selector.ServerSelector; +import io.druid.client.selector.TierSelectorStrategy; +import io.druid.query.DataSource; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.SingleElementPartitionChunk; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executor; + +/** + */ +public class CachingClusteredClientFunctionalityTest { + + public CachingClusteredClient client; + + protected VersionedIntervalTimeline timeline; + protected TimelineServerView serverView; + protected Cache cache; + + @Before + public void setUp() throws Exception + { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + serverView = EasyMock.createNiceMock(TimelineServerView.class); + cache = MapCache.create(100000); + client = makeClient(MoreExecutors.sameThreadExecutor()); + } + + @Test + public void testUncoveredInterval() throws Exception { + addToTimeline(new Interval("2015-01-02/2015-01-03"), "1"); + addToTimeline(new Interval("2015-01-04/2015-01-05"), "1"); + + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2015-01-02/2015-01-03") + .granularity("day") + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))); + + Map responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + Assert.assertNull(responseContext.get("uncoveredIntervals")); + + builder.intervals("2015-01-01/2015-01-03"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-01/2015-01-02"); + + builder.intervals("2015-01-01/2015-01-04"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04"); + + builder.intervals("2015-01-02/2015-01-04"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-03/2015-01-04"); + + builder.intervals("2015-01-01/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); + + builder.intervals("2015-01-02/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); + + builder.intervals("2015-01-04/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-05/2015-01-30"); + + builder.intervals("2015-01-10/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-10/2015-01-30"); + } + + private void assertUncovered(Map context, String... intervals) { + List expectedList = Lists.newArrayListWithExpectedSize(intervals.length); + for (String interval : intervals) { + expectedList.add(new Interval(interval)); + } + Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals")); + } + + private void addToTimeline(Interval interval, String version) { + timeline.add(interval, version, new SingleElementPartitionChunk<>( + new ServerSelector( + DataSegment.builder() + .dataSource("test") + .interval(interval) + .version(version) + .shardSpec(new NoneShardSpec()) + .build(), + new TierSelectorStrategy() { + @Override + public Comparator getComparator() { + return Ordering.natural(); + } + + @Override + public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment) { + return new QueryableDruidServer( + new DruidServer("localhost", "localhost", 100, "historical", "a", 10), + EasyMock.createNiceMock(DirectDruidClient.class) + ); + } + } + ) + )); + } + + protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) + { + return makeClient(backgroundExecutorService, cache, 10); + } + + protected CachingClusteredClient makeClient( + final ListeningExecutorService backgroundExecutorService, + final Cache cache, + final int mergeLimit + ) + { + return new CachingClusteredClient( + CachingClusteredClientTest.WAREHOUSE, + new TimelineServerView() + { + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + } + + @Override + public VersionedIntervalTimeline getTimeline(DataSource dataSource) + { + return timeline; + } + + @Override + public QueryRunner getQueryRunner(DruidServer server) + { + return serverView.getQueryRunner(server); + } + + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + + } + }, + cache, + CachingClusteredClientTest.jsonMapper, + backgroundExecutorService, + new CacheConfig() + { + @Override + public boolean isPopulateCache() + { + return true; + } + + @Override + public boolean isUseCache() + { + return true; + } + + @Override + public boolean isQueryCacheable(Query query) + { + return true; + } + + @Override + public int getCacheBulkMergeLimit() + { + return mergeLimit; + } + } + ); + } +} diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 3606e055d25..3ea2ee87ed1 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -154,7 +154,7 @@ public class CachingClusteredClientTest public static final ImmutableMap CONTEXT = ImmutableMap.of("finalize", false); public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); public static final String DATA_SOURCE = "test"; - protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); + static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); static { jsonMapper.getFactory().setCodec(jsonMapper); @@ -209,7 +209,7 @@ public class CachingClusteredClientTest private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final String TOP_DIM = "a_dim"; private static final Supplier GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig()); - private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse( + static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() .put( TimeseriesQuery.class,