From a36185926e69b011a44221ba861f18c026bd5a06 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Mon, 7 Dec 2015 10:41:53 -0800 Subject: [PATCH] Add "uncoveredIntervals" to responseContext This change will cause the CachingClusteredClient to populate the "uncoveredIntervals" key in the responseContext map. The value will be any intervals that were requested in the query but are not actually covered by the segments underlying the data source. For unit testing, CachingClisteredClientTest is testing the caching behavior of the object and it is pretty hard to adjust it to only test this new behavior, so I created a new, parallel "CachingClusteredClientFunctionalityTest" to simplify testing just basic functionality. --- .../druid/client/CachingClusteredClient.java | 26 +- ...chingClusteredClientFunctionalityTest.java | 230 ++++++++++++++++++ .../client/CachingClusteredClientTest.java | 4 +- 3 files changed, 257 insertions(+), 3 deletions(-) create mode 100644 server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 187f35a872f..fa0ee9f727a 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -162,9 +162,33 @@ public class CachingClusteredClient implements QueryRunner Set> segments = Sets.newLinkedHashSet(); List> serversLookup = Lists.newLinkedList(); + List uncoveredIntervals = Lists.newLinkedList(); for (Interval interval : query.getIntervals()) { - Iterables.addAll(serversLookup, timeline.lookup(interval)); + Iterable> lookup = timeline.lookup(interval); + long startMillis = interval.getStartMillis(); + long endMillis = interval.getEndMillis(); + for (TimelineObjectHolder holder : lookup) { + Interval holderInterval = holder.getInterval(); + long intervalStart = holderInterval.getStartMillis(); + if (startMillis != intervalStart) { + uncoveredIntervals.add(new Interval(startMillis, intervalStart)); + } + startMillis = holderInterval.getEndMillis(); + serversLookup.add(holder); + } + + if (startMillis < endMillis) { + uncoveredIntervals.add(new Interval(startMillis, endMillis)); + } + } + + if (!uncoveredIntervals.isEmpty()) { + // This returns intervals for which NO segment is present. + // Which is not necessarily an indication that the data doesn't exist or is + // incomplete. The data could exist and just not be loaded yet. In either + // case, though, this query will not include any data from the identified intervals. + responseContext.put("uncoveredIntervals", uncoveredIntervals); } // Let tool chest filter out unneeded segments diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java new file mode 100644 index 00000000000..7688514f3e5 --- /dev/null +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.client; + +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; +import io.druid.client.selector.QueryableDruidServer; +import io.druid.client.selector.ServerSelector; +import io.druid.client.selector.TierSelectorStrategy; +import io.druid.query.DataSource; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.SingleElementPartitionChunk; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executor; + +/** + */ +public class CachingClusteredClientFunctionalityTest { + + public CachingClusteredClient client; + + protected VersionedIntervalTimeline timeline; + protected TimelineServerView serverView; + protected Cache cache; + + @Before + public void setUp() throws Exception + { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + serverView = EasyMock.createNiceMock(TimelineServerView.class); + cache = MapCache.create(100000); + client = makeClient(MoreExecutors.sameThreadExecutor()); + } + + @Test + public void testUncoveredInterval() throws Exception { + addToTimeline(new Interval("2015-01-02/2015-01-03"), "1"); + addToTimeline(new Interval("2015-01-04/2015-01-05"), "1"); + + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2015-01-02/2015-01-03") + .granularity("day") + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))); + + Map responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + Assert.assertNull(responseContext.get("uncoveredIntervals")); + + builder.intervals("2015-01-01/2015-01-03"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-01/2015-01-02"); + + builder.intervals("2015-01-01/2015-01-04"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04"); + + builder.intervals("2015-01-02/2015-01-04"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-03/2015-01-04"); + + builder.intervals("2015-01-01/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); + + builder.intervals("2015-01-02/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); + + builder.intervals("2015-01-04/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-05/2015-01-30"); + + builder.intervals("2015-01-10/2015-01-30"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, "2015-01-10/2015-01-30"); + } + + private void assertUncovered(Map context, String... intervals) { + List expectedList = Lists.newArrayListWithExpectedSize(intervals.length); + for (String interval : intervals) { + expectedList.add(new Interval(interval)); + } + Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals")); + } + + private void addToTimeline(Interval interval, String version) { + timeline.add(interval, version, new SingleElementPartitionChunk<>( + new ServerSelector( + DataSegment.builder() + .dataSource("test") + .interval(interval) + .version(version) + .shardSpec(new NoneShardSpec()) + .build(), + new TierSelectorStrategy() { + @Override + public Comparator getComparator() { + return Ordering.natural(); + } + + @Override + public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment) { + return new QueryableDruidServer( + new DruidServer("localhost", "localhost", 100, "historical", "a", 10), + EasyMock.createNiceMock(DirectDruidClient.class) + ); + } + } + ) + )); + } + + protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) + { + return makeClient(backgroundExecutorService, cache, 10); + } + + protected CachingClusteredClient makeClient( + final ListeningExecutorService backgroundExecutorService, + final Cache cache, + final int mergeLimit + ) + { + return new CachingClusteredClient( + CachingClusteredClientTest.WAREHOUSE, + new TimelineServerView() + { + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + } + + @Override + public VersionedIntervalTimeline getTimeline(DataSource dataSource) + { + return timeline; + } + + @Override + public QueryRunner getQueryRunner(DruidServer server) + { + return serverView.getQueryRunner(server); + } + + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + + } + }, + cache, + CachingClusteredClientTest.jsonMapper, + backgroundExecutorService, + new CacheConfig() + { + @Override + public boolean isPopulateCache() + { + return true; + } + + @Override + public boolean isUseCache() + { + return true; + } + + @Override + public boolean isQueryCacheable(Query query) + { + return true; + } + + @Override + public int getCacheBulkMergeLimit() + { + return mergeLimit; + } + } + ); + } +} diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 3606e055d25..3ea2ee87ed1 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -154,7 +154,7 @@ public class CachingClusteredClientTest public static final ImmutableMap CONTEXT = ImmutableMap.of("finalize", false); public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); public static final String DATA_SOURCE = "test"; - protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); + static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory()); static { jsonMapper.getFactory().setCodec(jsonMapper); @@ -209,7 +209,7 @@ public class CachingClusteredClientTest private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final String TOP_DIM = "a_dim"; private static final Supplier GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig()); - private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse( + static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() .put( TimeseriesQuery.class,