From 41982116f4ff7a05dbf292db71b1bcab2cbe7c66 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 20 Jul 2020 21:02:52 -0700 Subject: [PATCH] Report missing segments when there is no segment for the query datasource in historicals (#10199) * Report missing segments when there is no segment for the query datasource in historicals * test * missing part for test * another test --- ...portTimelineMissingSegmentQueryRunner.java | 16 +- ...TimelineMissingSegmentQueryRunnerTest.java | 118 ++++++++++ .../server/ClientQuerySegmentWalker.java | 3 + .../server/coordination/ServerManager.java | 19 +- .../coordination/ServerManagerTest.java | 202 +++++++++++++++++- 5 files changed, 338 insertions(+), 20 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java diff --git a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java index ad27b16abcb..89c593256be 100644 --- a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -19,12 +19,13 @@ package org.apache.druid.query; +import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.context.ResponseContext; -import java.util.Collections; +import java.util.List; /** */ @@ -32,18 +33,23 @@ public class ReportTimelineMissingSegmentQueryRunner implements QueryRunner descriptors; public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor) { - this.descriptor = descriptor; + this(ImmutableList.of(descriptor)); + } + + public ReportTimelineMissingSegmentQueryRunner(List descriptors) + { + this.descriptors = descriptors; } @Override public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { - LOG.debug("Reporting a missing segment[%s] for query[%s]", descriptor, queryPlus.getQuery().getId()); - responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(descriptor)); + LOG.debug("Reporting a missing segments[%s] for query[%s]", descriptors, queryPlus.getQuery().getId()); + responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, descriptors); return Sequences.empty(); } } diff --git a/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java new file mode 100644 index 00000000000..ab054d0f94c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.context.DefaultResponseContext; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ReportTimelineMissingSegmentQueryRunnerTest +{ + @Test + public void testRunWithOneSegment() + { + final Interval interval = Intervals.of("2020-01-01/P1D"); + final SegmentDescriptor missingSegment = new SegmentDescriptor(interval, "version", 0); + final ReportTimelineMissingSegmentQueryRunner runner + = new ReportTimelineMissingSegmentQueryRunner<>(missingSegment); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); + runner.run(QueryPlus.wrap(new TestQuery()), responseContext); + Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertEquals(Collections.singletonList(missingSegment), responseContext.get(Key.MISSING_SEGMENTS)); + } + + @Test + public void testRunWithMultipleSegments() + { + final Interval interval = Intervals.of("2020-01-01/P1D"); + final List missingSegments = ImmutableList.of( + new SegmentDescriptor(interval, "version", 0), + new SegmentDescriptor(interval, "version", 1) + ); + final ReportTimelineMissingSegmentQueryRunner runner + = new ReportTimelineMissingSegmentQueryRunner<>(missingSegments); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); + runner.run(QueryPlus.wrap(new TestQuery()), responseContext); + Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertEquals(missingSegments, responseContext.get(Key.MISSING_SEGMENTS)); + } + + private static class TestQuery extends BaseQuery + { + private TestQuery() + { + super( + new TableDataSource("datasource"), + new MultipleSpecificSegmentSpec(Collections.emptyList()), + false, + new HashMap<>() + ); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 85da1ff41c7..be4acb263b3 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -183,6 +183,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker newQuery ); } else if (canRunQueryUsingClusterWalker(newQuery)) { + // Note: clusterClient.getQueryRunnerForIntervals() can return an empty sequence if there is no segment + // to query, but this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. return new QuerySwappingQueryRunner<>( decorateClusterRunner(newQuery, clusterClient.getQueryRunnerForIntervals(newQuery, intervals)), query, diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 7b86dd49ee0..79c7d946251 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; @@ -28,6 +29,7 @@ import org.apache.druid.client.cache.CachePopulator; import org.apache.druid.guice.annotations.Processing; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -46,6 +48,7 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; @@ -129,8 +132,9 @@ public class ServerManager implements QuerySegmentWalker if (maybeTimeline.isPresent()) { timeline = maybeTimeline.get(); } else { - // Note: this is not correct when there's a right or full outer join going on. - // See https://github.com/apache/druid/issues/9229 for details. + // Even though we didn't find a timeline for the query datasource, we simply returns a noopQueryRunner + // instead of reporting missing intervals because the query intervals are a filter rather than something + // we must find. return new NoopQueryRunner<>(); } @@ -164,10 +168,13 @@ public class ServerManager implements QuerySegmentWalker { final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { - log.makeAlert("Unknown query type, [%s]", query.getClass()) + final QueryUnsupportedException e = new QueryUnsupportedException( + StringUtils.format("Unknown query type, [%s]", query.getClass()) + ); + log.makeAlert(e, "Error while executing a query[%s]", query.getId()) .addData("dataSource", query.getDataSource()) .emit(); - return new NoopQueryRunner<>(); + throw e; } final QueryToolChest> toolChest = factory.getToolchest(); @@ -186,9 +193,7 @@ public class ServerManager implements QuerySegmentWalker if (maybeTimeline.isPresent()) { timeline = maybeTimeline.get(); } else { - // Note: this is not correct when there's a right or full outer join going on. - // See https://github.com/apache/druid/issues/9229 for details. - return new NoopQueryRunner<>(); + return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs)); } // segmentMapFn maps each base Segment into a joined Segment if necessary. diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 9ca113e2515..bf74ba66d7e 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -41,7 +41,9 @@ import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.BaseQuery; import org.apache.druid.query.ConcatQueryRunner; +import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.Druids; import org.apache.druid.query.NoopQueryRunner; @@ -52,11 +54,20 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.Result; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.MetricManipulationFn; +import org.apache.druid.query.context.DefaultResponseContext; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchResultValue; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; @@ -70,19 +81,27 @@ import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; +import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -93,6 +112,9 @@ import java.util.concurrent.TimeUnit; */ public class ServerManagerTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private ServerManager serverManager; private MyQueryRunnerFactory factory; private CountDownLatch queryWaitLatch; @@ -148,7 +170,11 @@ public class ServerManagerTest @Override public > QueryRunnerFactory findFactory(QueryType query) { - return (QueryRunnerFactory) factory; + if (query instanceof SearchQuery) { + return (QueryRunnerFactory) factory; + } else { + return null; + } } }, new NoopServiceEmitter(), @@ -406,6 +432,161 @@ public class ServerManagerTest } } + @Test + public void testGetQueryRunnerForIntervalsWhenTimelineIsMissingReturningNoopQueryRunner() + { + final Interval interval = Intervals.of("0000-01-01/P1D"); + final QueryRunner> queryRunner = serverManager.getQueryRunnerForIntervals( + searchQuery("unknown_datasource", interval, Granularities.ALL), + Collections.singletonList(interval) + ); + Assert.assertSame(NoopQueryRunner.class, queryRunner.getClass()); + } + + @Test + public void testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegments() + { + final Interval interval = Intervals.of("0000-01-01/P1D"); + final SearchQuery query = searchQuery("unknown_datasource", interval, Granularities.ALL); + final List unknownSegments = Collections.singletonList( + new SegmentDescriptor(interval, "unknown_version", 0) + ); + final QueryRunner> queryRunner = serverManager.getQueryRunnerForSegments( + query, + unknownSegments + ); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); + final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); + Assert.assertTrue(results.isEmpty()); + Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); + } + + @Test + public void testGetQueryRunnerForSegmentsWhenTimelineEntryIsMissingReportingMissingSegments() + { + final Interval interval = Intervals.of("P1d/2011-04-01"); + final SearchQuery query = searchQuery("test", interval, Granularities.ALL); + final List unknownSegments = Collections.singletonList( + new SegmentDescriptor(interval, "unknown_version", 0) + ); + final QueryRunner> queryRunner = serverManager.getQueryRunnerForSegments( + query, + unknownSegments + ); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); + final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); + Assert.assertTrue(results.isEmpty()); + Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); + } + + @Test + public void testGetQueryRunnerForSegmentsWhenTimelinePartitionChunkIsMissingReportingMissingSegments() + { + final Interval interval = Intervals.of("P1d/2011-04-01"); + final int unknownPartitionId = 1000; + final SearchQuery query = searchQuery("test", interval, Granularities.ALL); + final List unknownSegments = Collections.singletonList( + new SegmentDescriptor(interval, "1", unknownPartitionId) + ); + final QueryRunner> queryRunner = serverManager.getQueryRunnerForSegments( + query, + unknownSegments + ); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); + final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); + Assert.assertTrue(results.isEmpty()); + Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); + } + + @Test + public void testGetQueryRunnerForSegmentsWhenSegmentIsClosedReportingMissingSegments() + { + final Interval interval = Intervals.of("P1d/2011-04-01"); + final SearchQuery query = searchQuery("test", interval, Granularities.ALL); + final Optional> maybeTimeline = segmentManager + .getTimeline(DataSourceAnalysis.forDataSource(query.getDataSource())); + Assert.assertTrue(maybeTimeline.isPresent()); + final List> holders = maybeTimeline.get().lookup(interval); + final List closedSegments = new ArrayList<>(); + for (TimelineObjectHolder holder : holders) { + for (PartitionChunk chunk : holder.getObject()) { + final ReferenceCountingSegment segment = chunk.getObject(); + Assert.assertNotNull(segment.getId()); + closedSegments.add( + new SegmentDescriptor(segment.getDataInterval(), segment.getVersion(), segment.getId().getPartitionNum()) + ); + segment.close(); + } + } + final QueryRunner> queryRunner = serverManager.getQueryRunnerForSegments( + query, + closedSegments + ); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); + final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); + Assert.assertTrue(results.isEmpty()); + Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertEquals(closedSegments, responseContext.get(Key.MISSING_SEGMENTS)); + } + + @Test + public void testGetQueryRunnerForSegmentsForUnknownQueryThrowingException() + { + final Interval interval = Intervals.of("P1d/2011-04-01"); + final List descriptors = Collections.singletonList(new SegmentDescriptor(interval, "1", 0)); + expectedException.expect(QueryUnsupportedException.class); + expectedException.expectMessage("Unknown query type"); + serverManager.getQueryRunnerForSegments( + new BaseQuery( + new TableDataSource("test"), + new MultipleSpecificSegmentSpec(descriptors), + false, + new HashMap<>() + ) + { + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + }, + descriptors + ); + } + private void waitForTestVerificationAndCleanup(Future future) { try { @@ -420,6 +601,17 @@ public class ServerManagerTest } } + private SearchQuery searchQuery(String datasource, Interval interval, Granularity granularity) + { + return Druids.newSearchQueryBuilder() + .dataSource(datasource) + .intervals(Collections.singletonList(interval)) + .granularity(granularity) + .limit(10000) + .query("wow") + .build(); + } + private Future assertQueryable( Granularity granularity, String dataSource, @@ -429,13 +621,7 @@ public class ServerManagerTest { final Iterator> expectedIter = expected.iterator(); final List intervals = Collections.singletonList(interval); - final SearchQuery query = Druids.newSearchQueryBuilder() - .dataSource(dataSource) - .intervals(intervals) - .granularity(granularity) - .limit(10000) - .query("wow") - .build(); + final SearchQuery query = searchQuery(dataSource, interval, granularity); final QueryRunner> runner = serverManager.getQueryRunnerForIntervals( query, intervals