Report missing segments when there is no segment for the query datasource in historicals (#10199)

* Report missing segments when there is no segment for the query
datasource in historicals

* test

* missing part for test

* another test
This commit is contained in:
Jihoon Son 2020-07-20 21:02:52 -07:00 committed by GitHub
parent 0deefd6408
commit 41982116f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 338 additions and 20 deletions

View File

@ -19,12 +19,13 @@
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import java.util.Collections;
import java.util.List;
/**
*/
@ -32,18 +33,23 @@ public class ReportTimelineMissingSegmentQueryRunner<T> implements QueryRunner<T
{
private static final Logger LOG = new Logger(ReportTimelineMissingSegmentQueryRunner.class);
private final SegmentDescriptor descriptor;
private final List<SegmentDescriptor> descriptors;
public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor)
{
this.descriptor = descriptor;
this(ImmutableList.of(descriptor));
}
public ReportTimelineMissingSegmentQueryRunner(List<SegmentDescriptor> descriptors)
{
this.descriptors = descriptors;
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
LOG.debug("Reporting a missing segment[%s] for query[%s]", descriptor, queryPlus.getQuery().getId());
responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(descriptor));
LOG.debug("Reporting a missing segments[%s] for query[%s]", descriptors, queryPlus.getQuery().getId());
responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, descriptors);
return Sequences.empty();
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportTimelineMissingSegmentQueryRunnerTest
{
@Test
public void testRunWithOneSegment()
{
final Interval interval = Intervals.of("2020-01-01/P1D");
final SegmentDescriptor missingSegment = new SegmentDescriptor(interval, "version", 0);
final ReportTimelineMissingSegmentQueryRunner<Object> runner
= new ReportTimelineMissingSegmentQueryRunner<>(missingSegment);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
runner.run(QueryPlus.wrap(new TestQuery()), responseContext);
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
Assert.assertEquals(Collections.singletonList(missingSegment), responseContext.get(Key.MISSING_SEGMENTS));
}
@Test
public void testRunWithMultipleSegments()
{
final Interval interval = Intervals.of("2020-01-01/P1D");
final List<SegmentDescriptor> missingSegments = ImmutableList.of(
new SegmentDescriptor(interval, "version", 0),
new SegmentDescriptor(interval, "version", 1)
);
final ReportTimelineMissingSegmentQueryRunner<Object> runner
= new ReportTimelineMissingSegmentQueryRunner<>(missingSegments);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
runner.run(QueryPlus.wrap(new TestQuery()), responseContext);
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
Assert.assertEquals(missingSegments, responseContext.get(Key.MISSING_SEGMENTS));
}
private static class TestQuery extends BaseQuery<Object>
{
private TestQuery()
{
super(
new TableDataSource("datasource"),
new MultipleSpecificSegmentSpec(Collections.emptyList()),
false,
new HashMap<>()
);
}
@Override
public boolean hasFilters()
{
return false;
}
@Override
public DimFilter getFilter()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public Query<Object> withOverriddenContext(Map<String, Object> contextOverride)
{
return null;
}
@Override
public Query<Object> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return null;
}
@Override
public Query<Object> withDataSource(DataSource dataSource)
{
return null;
}
}
}

View File

@ -183,6 +183,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
newQuery
);
} else if (canRunQueryUsingClusterWalker(newQuery)) {
// Note: clusterClient.getQueryRunnerForIntervals() can return an empty sequence if there is no segment
// to query, but this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
return new QuerySwappingQueryRunner<>(
decorateClusterRunner(newQuery, clusterClient.getQueryRunnerForIntervals(newQuery, intervals)),
query,

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
@ -28,6 +29,7 @@ import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -46,6 +48,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ReferenceCountingSegmentQueryRunner;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
@ -129,8 +132,9 @@ public class ServerManager implements QuerySegmentWalker
if (maybeTimeline.isPresent()) {
timeline = maybeTimeline.get();
} else {
// Note: this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
// Even though we didn't find a timeline for the query datasource, we simply returns a noopQueryRunner
// instead of reporting missing intervals because the query intervals are a filter rather than something
// we must find.
return new NoopQueryRunner<>();
}
@ -164,10 +168,13 @@ public class ServerManager implements QuerySegmentWalker
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
log.makeAlert("Unknown query type, [%s]", query.getClass())
final QueryUnsupportedException e = new QueryUnsupportedException(
StringUtils.format("Unknown query type, [%s]", query.getClass())
);
log.makeAlert(e, "Error while executing a query[%s]", query.getId())
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<>();
throw e;
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
@ -186,9 +193,7 @@ public class ServerManager implements QuerySegmentWalker
if (maybeTimeline.isPresent()) {
timeline = maybeTimeline.get();
} else {
// Note: this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
return new NoopQueryRunner<>();
return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.

View File

@ -41,7 +41,9 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.ConcatQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.Druids;
import org.apache.druid.query.NoopQueryRunner;
@ -52,11 +54,20 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
@ -70,19 +81,27 @@ import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -93,6 +112,9 @@ import java.util.concurrent.TimeUnit;
*/
public class ServerManagerTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private ServerManager serverManager;
private MyQueryRunnerFactory factory;
private CountDownLatch queryWaitLatch;
@ -148,7 +170,11 @@ public class ServerManagerTest
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return (QueryRunnerFactory) factory;
if (query instanceof SearchQuery) {
return (QueryRunnerFactory) factory;
} else {
return null;
}
}
},
new NoopServiceEmitter(),
@ -406,6 +432,161 @@ public class ServerManagerTest
}
}
@Test
public void testGetQueryRunnerForIntervalsWhenTimelineIsMissingReturningNoopQueryRunner()
{
final Interval interval = Intervals.of("0000-01-01/P1D");
final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForIntervals(
searchQuery("unknown_datasource", interval, Granularities.ALL),
Collections.singletonList(interval)
);
Assert.assertSame(NoopQueryRunner.class, queryRunner.getClass());
}
@Test
public void testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegments()
{
final Interval interval = Intervals.of("0000-01-01/P1D");
final SearchQuery query = searchQuery("unknown_datasource", interval, Granularities.ALL);
final List<SegmentDescriptor> unknownSegments = Collections.singletonList(
new SegmentDescriptor(interval, "unknown_version", 0)
);
final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments(
query,
unknownSegments
);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
Assert.assertTrue(results.isEmpty());
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS));
}
@Test
public void testGetQueryRunnerForSegmentsWhenTimelineEntryIsMissingReportingMissingSegments()
{
final Interval interval = Intervals.of("P1d/2011-04-01");
final SearchQuery query = searchQuery("test", interval, Granularities.ALL);
final List<SegmentDescriptor> unknownSegments = Collections.singletonList(
new SegmentDescriptor(interval, "unknown_version", 0)
);
final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments(
query,
unknownSegments
);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
Assert.assertTrue(results.isEmpty());
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS));
}
@Test
public void testGetQueryRunnerForSegmentsWhenTimelinePartitionChunkIsMissingReportingMissingSegments()
{
final Interval interval = Intervals.of("P1d/2011-04-01");
final int unknownPartitionId = 1000;
final SearchQuery query = searchQuery("test", interval, Granularities.ALL);
final List<SegmentDescriptor> unknownSegments = Collections.singletonList(
new SegmentDescriptor(interval, "1", unknownPartitionId)
);
final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments(
query,
unknownSegments
);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
Assert.assertTrue(results.isEmpty());
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS));
}
@Test
public void testGetQueryRunnerForSegmentsWhenSegmentIsClosedReportingMissingSegments()
{
final Interval interval = Intervals.of("P1d/2011-04-01");
final SearchQuery query = searchQuery("test", interval, Granularities.ALL);
final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline = segmentManager
.getTimeline(DataSourceAnalysis.forDataSource(query.getDataSource()));
Assert.assertTrue(maybeTimeline.isPresent());
final List<TimelineObjectHolder<String, ReferenceCountingSegment>> holders = maybeTimeline.get().lookup(interval);
final List<SegmentDescriptor> closedSegments = new ArrayList<>();
for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : holders) {
for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
final ReferenceCountingSegment segment = chunk.getObject();
Assert.assertNotNull(segment.getId());
closedSegments.add(
new SegmentDescriptor(segment.getDataInterval(), segment.getVersion(), segment.getId().getPartitionNum())
);
segment.close();
}
}
final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments(
query,
closedSegments
);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
Assert.assertTrue(results.isEmpty());
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
Assert.assertEquals(closedSegments, responseContext.get(Key.MISSING_SEGMENTS));
}
@Test
public void testGetQueryRunnerForSegmentsForUnknownQueryThrowingException()
{
final Interval interval = Intervals.of("P1d/2011-04-01");
final List<SegmentDescriptor> descriptors = Collections.singletonList(new SegmentDescriptor(interval, "1", 0));
expectedException.expect(QueryUnsupportedException.class);
expectedException.expectMessage("Unknown query type");
serverManager.getQueryRunnerForSegments(
new BaseQuery<Object>(
new TableDataSource("test"),
new MultipleSpecificSegmentSpec(descriptors),
false,
new HashMap<>()
)
{
@Override
public boolean hasFilters()
{
return false;
}
@Override
public DimFilter getFilter()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public Query<Object> withOverriddenContext(Map<String, Object> contextOverride)
{
return null;
}
@Override
public Query<Object> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return null;
}
@Override
public Query<Object> withDataSource(DataSource dataSource)
{
return null;
}
},
descriptors
);
}
private void waitForTestVerificationAndCleanup(Future future)
{
try {
@ -420,6 +601,17 @@ public class ServerManagerTest
}
}
private SearchQuery searchQuery(String datasource, Interval interval, Granularity granularity)
{
return Druids.newSearchQueryBuilder()
.dataSource(datasource)
.intervals(Collections.singletonList(interval))
.granularity(granularity)
.limit(10000)
.query("wow")
.build();
}
private Future assertQueryable(
Granularity granularity,
String dataSource,
@ -429,13 +621,7 @@ public class ServerManagerTest
{
final Iterator<Pair<String, Interval>> expectedIter = expected.iterator();
final List<Interval> intervals = Collections.singletonList(interval);
final SearchQuery query = Druids.newSearchQueryBuilder()
.dataSource(dataSource)
.intervals(intervals)
.granularity(granularity)
.limit(10000)
.query("wow")
.build();
final SearchQuery query = searchQuery(dataSource, interval, granularity);
final QueryRunner<Result<SearchResultValue>> runner = serverManager.getQueryRunnerForIntervals(
query,
intervals