mirror of https://github.com/apache/druid.git
Merge pull request #2058 from cheddar/report-uncovered-segments
Add "uncoveredIntervals" to responseContext
This commit is contained in:
commit
ba980dcadb
|
@ -164,9 +164,33 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
|
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
|
||||||
|
|
||||||
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
|
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
|
||||||
|
List<Interval> uncoveredIntervals = Lists.newLinkedList();
|
||||||
|
|
||||||
for (Interval interval : query.getIntervals()) {
|
for (Interval interval : query.getIntervals()) {
|
||||||
Iterables.addAll(serversLookup, timeline.lookup(interval));
|
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
|
||||||
|
long startMillis = interval.getStartMillis();
|
||||||
|
long endMillis = interval.getEndMillis();
|
||||||
|
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) {
|
||||||
|
Interval holderInterval = holder.getInterval();
|
||||||
|
long intervalStart = holderInterval.getStartMillis();
|
||||||
|
if (startMillis != intervalStart) {
|
||||||
|
uncoveredIntervals.add(new Interval(startMillis, intervalStart));
|
||||||
|
}
|
||||||
|
startMillis = holderInterval.getEndMillis();
|
||||||
|
serversLookup.add(holder);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (startMillis < endMillis) {
|
||||||
|
uncoveredIntervals.add(new Interval(startMillis, endMillis));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!uncoveredIntervals.isEmpty()) {
|
||||||
|
// This returns intervals for which NO segment is present.
|
||||||
|
// Which is not necessarily an indication that the data doesn't exist or is
|
||||||
|
// incomplete. The data could exist and just not be loaded yet. In either
|
||||||
|
// case, though, this query will not include any data from the identified intervals.
|
||||||
|
responseContext.put("uncoveredIntervals", uncoveredIntervals);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let tool chest filter out unneeded segments
|
// Let tool chest filter out unneeded segments
|
||||||
|
|
|
@ -0,0 +1,230 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.client;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import io.druid.client.cache.Cache;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
|
import io.druid.client.cache.MapCache;
|
||||||
|
import io.druid.client.selector.QueryableDruidServer;
|
||||||
|
import io.druid.client.selector.ServerSelector;
|
||||||
|
import io.druid.client.selector.TierSelectorStrategy;
|
||||||
|
import io.druid.query.DataSource;
|
||||||
|
import io.druid.query.Druids;
|
||||||
|
import io.druid.query.Query;
|
||||||
|
import io.druid.query.QueryRunner;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import io.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import io.druid.timeline.partition.SingleElementPartitionChunk;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class CachingClusteredClientFunctionalityTest {
|
||||||
|
|
||||||
|
public CachingClusteredClient client;
|
||||||
|
|
||||||
|
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
|
||||||
|
protected TimelineServerView serverView;
|
||||||
|
protected Cache cache;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
|
||||||
|
serverView = EasyMock.createNiceMock(TimelineServerView.class);
|
||||||
|
cache = MapCache.create(100000);
|
||||||
|
client = makeClient(MoreExecutors.sameThreadExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUncoveredInterval() throws Exception {
|
||||||
|
addToTimeline(new Interval("2015-01-02/2015-01-03"), "1");
|
||||||
|
addToTimeline(new Interval("2015-01-04/2015-01-05"), "1");
|
||||||
|
|
||||||
|
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource("test")
|
||||||
|
.intervals("2015-01-02/2015-01-03")
|
||||||
|
.granularity("day")
|
||||||
|
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")));
|
||||||
|
|
||||||
|
Map<String, Object> responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
Assert.assertNull(responseContext.get("uncoveredIntervals"));
|
||||||
|
|
||||||
|
builder.intervals("2015-01-01/2015-01-03");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-01/2015-01-02");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-01/2015-01-04");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-02/2015-01-04");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-03/2015-01-04");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-01/2015-01-30");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-02/2015-01-30");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-04/2015-01-30");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-05/2015-01-30");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-10/2015-01-30");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, "2015-01-10/2015-01-30");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertUncovered(Map<String, Object> context, String... intervals) {
|
||||||
|
List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length);
|
||||||
|
for (String interval : intervals) {
|
||||||
|
expectedList.add(new Interval(interval));
|
||||||
|
}
|
||||||
|
Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addToTimeline(Interval interval, String version) {
|
||||||
|
timeline.add(interval, version, new SingleElementPartitionChunk<>(
|
||||||
|
new ServerSelector(
|
||||||
|
DataSegment.builder()
|
||||||
|
.dataSource("test")
|
||||||
|
.interval(interval)
|
||||||
|
.version(version)
|
||||||
|
.shardSpec(new NoneShardSpec())
|
||||||
|
.build(),
|
||||||
|
new TierSelectorStrategy() {
|
||||||
|
@Override
|
||||||
|
public Comparator<Integer> getComparator() {
|
||||||
|
return Ordering.natural();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment) {
|
||||||
|
return new QueryableDruidServer(
|
||||||
|
new DruidServer("localhost", "localhost", 100, "historical", "a", 10),
|
||||||
|
EasyMock.createNiceMock(DirectDruidClient.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
|
||||||
|
{
|
||||||
|
return makeClient(backgroundExecutorService, cache, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CachingClusteredClient makeClient(
|
||||||
|
final ListeningExecutorService backgroundExecutorService,
|
||||||
|
final Cache cache,
|
||||||
|
final int mergeLimit
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return new CachingClusteredClient(
|
||||||
|
CachingClusteredClientTest.WAREHOUSE,
|
||||||
|
new TimelineServerView()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
|
||||||
|
{
|
||||||
|
return timeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
|
||||||
|
{
|
||||||
|
return serverView.getQueryRunner(server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerServerCallback(Executor exec, ServerCallback callback)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
},
|
||||||
|
cache,
|
||||||
|
CachingClusteredClientTest.jsonMapper,
|
||||||
|
backgroundExecutorService,
|
||||||
|
new CacheConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean isPopulateCache()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isUseCache()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isQueryCacheable(Query query)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCacheBulkMergeLimit()
|
||||||
|
{
|
||||||
|
return mergeLimit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -156,7 +156,7 @@ public class CachingClusteredClientTest
|
||||||
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.<String, Object>of("finalize", false);
|
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.<String, Object>of("finalize", false);
|
||||||
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
|
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
|
||||||
public static final String DATA_SOURCE = "test";
|
public static final String DATA_SOURCE = "test";
|
||||||
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
|
static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
|
||||||
|
|
||||||
static {
|
static {
|
||||||
jsonMapper.getFactory().setCodec(jsonMapper);
|
jsonMapper.getFactory().setCodec(jsonMapper);
|
||||||
|
@ -211,7 +211,7 @@ public class CachingClusteredClientTest
|
||||||
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
|
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
|
||||||
private static final String TOP_DIM = "a_dim";
|
private static final String TOP_DIM = "a_dim";
|
||||||
private static final Supplier<GroupByQueryConfig> GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig());
|
private static final Supplier<GroupByQueryConfig> GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig());
|
||||||
private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(
|
static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(
|
||||||
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
|
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
|
||||||
.put(
|
.put(
|
||||||
TimeseriesQuery.class,
|
TimeseriesQuery.class,
|
||||||
|
|
Loading…
Reference in New Issue