From 2ef5c17441a450171523653787743db76a3e3bdb Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 11 Mar 2020 11:32:27 -0700 Subject: [PATCH] Link up row-based datasources to serving layer. (#9503) * Link up row-based datasources to serving layer. - Add SegmentWrangler interface that allows linking of DataSources to Segments. - Add LocalQuerySegmentWalker that uses SegmentWranglers to compute queries on data that is available locally. - Modify ClientQuerySegmentWalker to use LocalQuerySegmentWalker when the base datasource is concrete and not a table. - Add SegmentWranglerModule to the Broker so it has them available and can properly instantiate . LocalQuerySegmentWalkers. - Set InlineDataSource and LookupDataSource to concrete, since they can be directly queried now. * Fix tests. --- .../movingaverage/MovingAverageQueryTest.java | 9 +- .../queries/wikipedia_editstream_queries.json | 80 ++++++++++++ .../druid/query/FluentQueryRunnerBuilder.java | 7 +- .../apache/druid/query/InlineDataSource.java | 7 +- .../apache/druid/query/LookupDataSource.java | 2 +- .../apache/druid/segment/SegmentWrangler.java | 45 +++++++ .../druid/segment/join/JoinableFactory.java | 2 + .../druid/query/InlineDataSourceTest.java | 2 +- .../druid/query/LookupDataSourceTest.java | 2 +- .../planning/DataSourceAnalysisTest.java | 8 +- .../org/apache/druid/guice/DruidBinders.java | 22 ++-- .../druid/guice/SegmentWranglerModule.java | 67 ++++++++++ .../druid/segment/InlineSegmentWrangler.java | 53 ++++++++ .../druid/segment/LookupSegmentWrangler.java | 67 ++++++++++ .../druid/segment/MapSegmentWrangler.java | 56 +++++++++ .../server/ClientQuerySegmentWalker.java | 80 ++++++------ .../druid/server/LocalQuerySegmentWalker.java | 117 ++++++++++++++++++ .../segment/InlineSegmentWranglerTest.java | 79 ++++++++++++ .../segment/LookupSegmentWranglerTest.java | 111 +++++++++++++++++ .../java/org/apache/druid/cli/CliBroker.java | 6 +- .../SpecificSegmentsQuerySegmentWalker.java | 12 +- 21 files changed, 774 insertions(+), 60 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/SegmentWrangler.java create mode 100644 server/src/main/java/org/apache/druid/guice/SegmentWranglerModule.java create mode 100644 server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java create mode 100644 server/src/main/java/org/apache/druid/segment/LookupSegmentWrangler.java create mode 100644 server/src/main/java/org/apache/druid/segment/MapSegmentWrangler.java create mode 100644 server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java create mode 100644 server/src/test/java/org/apache/druid/segment/InlineSegmentWranglerTest.java create mode 100644 server/src/test/java/org/apache/druid/segment/LookupSegmentWranglerTest.java diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 033ffb47f82..d11fae98baf 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -375,7 +375,14 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest { } }, - baseClient, warehouse, retryConfig, jsonMapper, serverConfig, null, new CacheConfig() + baseClient, + null /* local client; unused in this test, so pass in null */, + warehouse, + retryConfig, + jsonMapper, + serverConfig, + null, + new CacheConfig() ); defineMocks(); diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index 3a9c149448b..82c79ee1c69 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -1699,5 +1699,85 @@ } } ] + }, + { + "description": "groupBy on lookup", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "lookup", + "lookup": "wiki-simple" + }, + "intervals": [ + "0000-01-01T00:00:00.000/3000-01-01T00:00:00.000" + ], + "granularity": "all", + "dimensions": ["k", "v", "nonexistent"], + "aggregations": [ + { + "type": "count", + "name": "rows" + } + ], + "context": { + "useCache": "true", + "populateCache": "true", + "timeout": 360000 + } + }, + "expectedResults": [ + { + "version": "v1", + "timestamp": "0000-01-01T00:00:00.000Z", + "event": { + "k": "Wikipedia:Vandalismusmeldung", + "v": "lookup!", + "nonexistent": null, + "rows": 1 + } + } + ] + }, + { + "description": "groupBy on inline", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "inline", + "columnNames": ["k", "v"], + "columnTypes": ["string", "string"], + "rows": [ + ["Wikipedia:Vandalismusmeldung", "inline!"] + ] + }, + "intervals": [ + "0000-01-01T00:00:00.000/3000-01-01T00:00:00.000" + ], + "granularity": "all", + "dimensions": ["k", "v", "nonexistent"], + "aggregations": [ + { + "type": "count", + "name": "rows" + } + ], + "context": { + "useCache": "true", + "populateCache": "true", + "timeout": 360000 + } + }, + "expectedResults": [ + { + "version": "v1", + "timestamp": "0000-01-01T00:00:00.000Z", + "event": { + "k": "Wikipedia:Vandalismusmeldung", + "v": "inline!", + "nonexistent": null, + "rows": 1 + } + } + ] } ] diff --git a/processing/src/main/java/org/apache/druid/query/FluentQueryRunnerBuilder.java b/processing/src/main/java/org/apache/druid/query/FluentQueryRunnerBuilder.java index 15358699949..3f28b5907bb 100644 --- a/processing/src/main/java/org/apache/druid/query/FluentQueryRunnerBuilder.java +++ b/processing/src/main/java/org/apache/druid/query/FluentQueryRunnerBuilder.java @@ -70,13 +70,18 @@ public class FluentQueryRunnerBuilder } public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter) + { + return emitCPUTimeMetric(emitter, new AtomicLong(0L)); + } + + public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter, AtomicLong accumulator) { return from( CPUTimeMetricQueryRunner.safeBuild( baseRunner, toolChest, emitter, - new AtomicLong(0L), + accumulator, true ) ); diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java index 890fbc00ba4..f8186667d4b 100644 --- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -164,7 +164,7 @@ public class InlineDataSource implements DataSource @Override public boolean isConcrete() { - return false; + return true; } public Map getRowSignature() @@ -172,7 +172,10 @@ public class InlineDataSource implements DataSource final ImmutableMap.Builder retVal = ImmutableMap.builder(); for (int i = 0; i < columnNames.size(); i++) { - retVal.put(columnNames.get(i), columnTypes.get(i)); + final ValueType columnType = columnTypes.get(i); + if (columnType != null) { + retVal.put(columnNames.get(i), columnType); + } } return retVal.build(); diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java index a2c99f7d1fd..16bf6751744 100644 --- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -93,7 +93,7 @@ public class LookupDataSource implements DataSource @Override public boolean isConcrete() { - return false; + return true; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/SegmentWrangler.java b/processing/src/main/java/org/apache/druid/segment/SegmentWrangler.java new file mode 100644 index 00000000000..ec08fd86527 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/SegmentWrangler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.DataSource; +import org.joda.time.Interval; + +/** + * Utility for creating {@link Segment} objects for concrete datasources. + * + * @see org.apache.druid.guice.DruidBinders#segmentWranglerBinder to register factories + */ +public interface SegmentWrangler +{ + /** + * Gets Segments for a particular datasource and set of intervals. These are expected to exist for any datasource + * where {@link DataSource#isConcrete} and {@link DataSource#isGlobal} are both true (corresponding to datasources + * where any Druid server could scan its data). + * + * Note: there are no SegmentWranglers for 'table' datasources (Druid's distributed datasources) because those are + * special and handled in their own special way. + * + * @return Segments that, collectively, contain data for dataSource. May be empty if dataSource does not exist or + * has no data in the provided intervals. May contain data outside the provided intervals, so callers should + * filter it down further, e.g. through the "interval" parameter of {@link StorageAdapter#makeCursors} + */ + Iterable getSegmentsForIntervals(DataSource dataSource, Iterable intervals); +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java index 5016d75a79c..fc63f1cfbee 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java @@ -25,6 +25,8 @@ import java.util.Optional; /** * Utility for creating {@link Joinable} objects. + * + * @see org.apache.druid.guice.DruidBinders#joinableFactoryBinder to register factories */ public interface JoinableFactory { diff --git a/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java index c533ec3f9e1..70d1f37eca1 100644 --- a/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java @@ -139,7 +139,7 @@ public class InlineDataSourceTest @Test public void test_isConcrete() { - Assert.assertFalse(listDataSource.isConcrete()); + Assert.assertTrue(listDataSource.isConcrete()); } @Test diff --git a/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java index c68579ff60a..f3b4ef252eb 100644 --- a/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java @@ -64,7 +64,7 @@ public class LookupDataSourceTest @Test public void test_isConcrete() { - Assert.assertFalse(lookylooDataSource.isConcrete()); + Assert.assertTrue(lookylooDataSource.isConcrete()); } @Test diff --git a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java index 1b24c5e3898..d053c7cca39 100644 --- a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java +++ b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java @@ -136,7 +136,7 @@ public class DataSourceAnalysisTest { final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO); - Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteBased()); Assert.assertFalse(analysis.isConcreteTableBased()); Assert.assertTrue(analysis.isGlobal()); Assert.assertFalse(analysis.isQuery()); @@ -153,7 +153,7 @@ public class DataSourceAnalysisTest final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO); final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); - Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteBased()); Assert.assertFalse(analysis.isConcreteTableBased()); Assert.assertTrue(analysis.isGlobal()); Assert.assertTrue(analysis.isQuery()); @@ -172,7 +172,7 @@ public class DataSourceAnalysisTest { final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE); - Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteBased()); Assert.assertFalse(analysis.isConcreteTableBased()); Assert.assertTrue(analysis.isGlobal()); Assert.assertFalse(analysis.isQuery()); @@ -378,7 +378,7 @@ public class DataSourceAnalysisTest final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); - Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteBased()); Assert.assertFalse(analysis.isConcreteTableBased()); Assert.assertTrue(analysis.isGlobal()); Assert.assertFalse(analysis.isQuery()); diff --git a/server/src/main/java/org/apache/druid/guice/DruidBinders.java b/server/src/main/java/org/apache/druid/guice/DruidBinders.java index da024e908e2..258ee58be57 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidBinders.java +++ b/server/src/main/java/org/apache/druid/guice/DruidBinders.java @@ -28,18 +28,17 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.DruidNode; -/** - */ public class DruidBinders { public static MapBinder, QueryRunnerFactory> queryRunnerFactoryBinder(Binder binder) { return MapBinder.newMapBinder( binder, - new TypeLiteral>(){}, + new TypeLiteral>() {}, TypeLiteral.get(QueryRunnerFactory.class) ); } @@ -48,19 +47,28 @@ public class DruidBinders { return MapBinder.newMapBinder( binder, - new TypeLiteral>(){}, - new TypeLiteral(){} + new TypeLiteral>() {}, + new TypeLiteral() {} ); } public static Multibinder> discoveryAnnouncementBinder(Binder binder) { - return Multibinder.newSetBinder(binder, new TypeLiteral>(){}); + return Multibinder.newSetBinder(binder, new TypeLiteral>() {}); } public static Multibinder> metricMonitorBinder(Binder binder) { - return Multibinder.newSetBinder(binder, new TypeLiteral>(){}); + return Multibinder.newSetBinder(binder, new TypeLiteral>() {}); + } + + public static MapBinder, SegmentWrangler> segmentWranglerBinder(Binder binder) + { + return MapBinder.newMapBinder( + binder, + new TypeLiteral>() {}, + new TypeLiteral() {} + ); } public static MapBinder, JoinableFactory> joinableFactoryBinder(Binder binder) diff --git a/server/src/main/java/org/apache/druid/guice/SegmentWranglerModule.java b/server/src/main/java/org/apache/druid/guice/SegmentWranglerModule.java new file mode 100644 index 00000000000..93b7090a083 --- /dev/null +++ b/server/src/main/java/org/apache/druid/guice/SegmentWranglerModule.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; +import com.google.inject.multibindings.MapBinder; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.InlineSegmentWrangler; +import org.apache.druid.segment.LookupSegmentWrangler; +import org.apache.druid.segment.MapSegmentWrangler; +import org.apache.druid.segment.SegmentWrangler; + +import java.util.Map; + +/** + * Module that installs DataSource-class-specific {@link SegmentWrangler} implementations. + */ +public class SegmentWranglerModule implements Module +{ + /** + * Default mappings of datasources to factories. + */ + @VisibleForTesting + static final Map, Class> WRANGLER_MAPPINGS = + ImmutableMap.of( + InlineDataSource.class, InlineSegmentWrangler.class, + LookupDataSource.class, LookupSegmentWrangler.class + ); + + @Override + public void configure(Binder binder) + { + final MapBinder, SegmentWrangler> segmentWranglers = + DruidBinders.segmentWranglerBinder(binder); + + WRANGLER_MAPPINGS.forEach((ds, wrangler) -> { + segmentWranglers.addBinding(ds).to(wrangler); + binder.bind(wrangler).in(LazySingleton.class); + }); + + binder.bind(SegmentWrangler.class).to(MapSegmentWrangler.class) + .in(Scopes.SINGLETON); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java b/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java new file mode 100644 index 00000000000..6417df0e6d1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import java.util.Collections; + +/** + * A {@link JoinableFactory} for {@link InlineDataSource}. + * + * It is not valid to pass any other DataSource type to the "getSegmentsForIntervals" method. + */ +public class InlineSegmentWrangler implements SegmentWrangler +{ + private static final String SEGMENT_ID = "inline"; + + @Override + public Iterable getSegmentsForIntervals(final DataSource dataSource, final Iterable intervals) + { + final InlineDataSource inlineDataSource = (InlineDataSource) dataSource; + + return Collections.singletonList( + new RowBasedSegment<>( + SegmentId.dummy(SEGMENT_ID), + inlineDataSource.getRows(), + inlineDataSource.rowAdapter(), + inlineDataSource.getRowSignature() + ) + ); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/LookupSegmentWrangler.java b/server/src/main/java/org/apache/druid/segment/LookupSegmentWrangler.java new file mode 100644 index 00000000000..38aded4e37b --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/LookupSegmentWrangler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.inject.Inject; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.apache.druid.query.lookup.LookupSegment; +import org.apache.druid.segment.join.JoinableFactory; +import org.joda.time.Interval; + +import java.util.Collections; +import java.util.Optional; + +/** + * A {@link JoinableFactory} for {@link LookupDataSource}. + * + * It is not valid to pass any other DataSource type to the "getSegmentsForIntervals" method. + */ +public class LookupSegmentWrangler implements SegmentWrangler +{ + private final LookupExtractorFactoryContainerProvider lookupProvider; + + @Inject + public LookupSegmentWrangler(final LookupExtractorFactoryContainerProvider lookupProvider) + { + this.lookupProvider = lookupProvider; + } + + @Override + public Iterable getSegmentsForIntervals(final DataSource dataSource, final Iterable intervals) + { + final LookupDataSource lookupDataSource = (LookupDataSource) dataSource; + + final Optional maybeContainer = + lookupProvider.get(lookupDataSource.getLookupName()); + + return maybeContainer.map( + container -> + Collections.singletonList( + new LookupSegment( + lookupDataSource.getLookupName(), + container.getLookupExtractorFactory() + ) + ) + ).orElse(Collections.emptyList()); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/MapSegmentWrangler.java b/server/src/main/java/org/apache/druid/segment/MapSegmentWrangler.java new file mode 100644 index 00000000000..0f09aab4ab2 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/MapSegmentWrangler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.DataSource; +import org.joda.time.Interval; + +import java.util.Map; + +/** + * An implementation of {@link SegmentWrangler} that allows registration of DataSource-specific handlers via Guice. + * + * @see org.apache.druid.guice.DruidBinders#segmentWranglerBinder to register wranglers + */ +public class MapSegmentWrangler implements SegmentWrangler +{ + private final Map, SegmentWrangler> wranglers; + + @Inject + public MapSegmentWrangler(final Map, SegmentWrangler> wranglers) + { + this.wranglers = wranglers; + } + + @Override + public Iterable getSegmentsForIntervals(final DataSource dataSource, final Iterable intervals) + { + final SegmentWrangler wrangler = wranglers.get(dataSource.getClass()); + + if (wrangler != null) { + return wrangler.getSegmentsForIntervals(dataSource, intervals); + } else { + // Reasonable as a user-facing error message. + throw new ISE("Cannot read directly out of dataSource: %s", dataSource); + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 27bccaf1508..eb57705e886 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -48,7 +48,8 @@ import org.joda.time.Interval; public class ClientQuerySegmentWalker implements QuerySegmentWalker { private final ServiceEmitter emitter; - private final QuerySegmentWalker baseClient; + private final QuerySegmentWalker clusterClient; + private final QuerySegmentWalker localClient; private final QueryToolChestWarehouse warehouse; private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; @@ -58,7 +59,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker public ClientQuerySegmentWalker( ServiceEmitter emitter, - QuerySegmentWalker baseClient, + QuerySegmentWalker clusterClient, + QuerySegmentWalker localClient, QueryToolChestWarehouse warehouse, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, @@ -68,7 +70,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker ) { this.emitter = emitter; - this.baseClient = baseClient; + this.clusterClient = clusterClient; + this.localClient = localClient; this.warehouse = warehouse; this.retryConfig = retryConfig; this.objectMapper = objectMapper; @@ -80,7 +83,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker @Inject ClientQuerySegmentWalker( ServiceEmitter emitter, - CachingClusteredClient baseClient, + CachingClusteredClient clusterClient, + LocalQuerySegmentWalker localClient, QueryToolChestWarehouse warehouse, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, @@ -91,7 +95,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker { this( emitter, - (QuerySegmentWalker) baseClient, + (QuerySegmentWalker) clusterClient, + (QuerySegmentWalker) localClient, warehouse, retryConfig, objectMapper, @@ -107,7 +112,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (analysis.isConcreteTableBased()) { - return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + return decorateClusterRunner(query, clusterClient.getQueryRunnerForIntervals(query, intervals)); + } else if (analysis.isConcreteBased() && analysis.isGlobal()) { + // Concrete, non-table based, can run locally. No need to decorate since LocalQuerySegmentWalker does its own. + return localClient.getQueryRunnerForIntervals(query, intervals); } else { // In the future, we will check here to see if parts of the query are inlinable, and if that inlining would // be able to create a concrete table-based query that we can run through the distributed query stack. @@ -121,19 +129,42 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (analysis.isConcreteTableBased()) { - return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs)); } else { throw new ISE("Query dataSource is not table-based, cannot run"); } } - private QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) + private QueryRunner decorateClusterRunner(Query query, QueryRunner baseClusterRunner) { - QueryToolChest> toolChest = warehouse.getToolChest(query); + final QueryToolChest> toolChest = warehouse.getToolChest(query); + + final PostProcessingOperator postProcessing = objectMapper.convertValue( + query.getContextValue("postProcessing"), + new TypeReference>() {} + ); + + final QueryRunner mostlyDecoratedRunner = + new FluentQueryRunnerBuilder<>(toolChest) + .create( + new SetAndVerifyContextQueryRunner<>( + serverConfig, + new RetryQueryRunner<>( + baseClusterRunner, + retryConfig, + objectMapper + ) + ) + ) + .applyPreMergeDecoration() + .mergeResults() + .applyPostMergeDecoration() + .emitCPUTimeMetric(emitter) + .postProcess(postProcessing); // This does not adhere to the fluent workflow. See https://github.com/apache/druid/issues/5517 return new ResultLevelCachingQueryRunner<>( - makeRunner(query, baseClientRunner, toolChest), + mostlyDecoratedRunner, toolChest, query, objectMapper, @@ -141,33 +172,4 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker cacheConfig ); } - - private QueryRunner makeRunner( - Query query, - QueryRunner baseClientRunner, - QueryToolChest> toolChest - ) - { - PostProcessingOperator postProcessing = objectMapper.convertValue( - query.getContextValue("postProcessing"), - new TypeReference>() {} - ); - - return new FluentQueryRunnerBuilder<>(toolChest) - .create( - new SetAndVerifyContextQueryRunner<>( - serverConfig, - new RetryQueryRunner<>( - baseClientRunner, - retryConfig, - objectMapper - ) - ) - ) - .applyPreMergeDecoration() - .mergeResults() - .applyPostMergeDecoration() - .emitCPUTimeMetric(emitter) - .postProcess(postProcessing); - } } diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java new file mode 100644 index 00000000000..54ef62d1487 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.FluentQueryRunnerBuilder; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentWrangler; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.Joinables; +import org.joda.time.Interval; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.StreamSupport; + +/** + * Processor that computes Druid queries, single-threaded. + * + * The datasource for the query must satisfy {@link DataSourceAnalysis#isConcreteBased()} and + * {@link DataSourceAnalysis#isGlobal()}. Its base datasource must also be handleable by the provided + * {@link SegmentWrangler}. + */ +public class LocalQuerySegmentWalker implements QuerySegmentWalker +{ + private final QueryRunnerFactoryConglomerate conglomerate; + private final SegmentWrangler segmentWrangler; + private final JoinableFactory joinableFactory; + private final ServiceEmitter emitter; + + @Inject + public LocalQuerySegmentWalker( + QueryRunnerFactoryConglomerate conglomerate, + SegmentWrangler segmentWrangler, + JoinableFactory joinableFactory, + ServiceEmitter emitter + ) + { + this.conglomerate = conglomerate; + this.segmentWrangler = segmentWrangler; + this.joinableFactory = joinableFactory; + this.emitter = emitter; + } + + @Override + public QueryRunner getQueryRunnerForIntervals(final Query query, final Iterable intervals) + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + + if (!analysis.isConcreteBased() || !analysis.isGlobal()) { + throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource()); + } + + final AtomicLong cpuAccumulator = new AtomicLong(0L); + final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(query); + final Iterable segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals); + final Function segmentMapFn = Joinables.createSegmentMapFn( + analysis.getPreJoinableClauses(), + joinableFactory, + cpuAccumulator, + QueryContexts.getEnableJoinFilterPushDown(query), + QueryContexts.getEnableJoinFilterRewrite(query) + ); + + final QueryRunner baseRunner = queryRunnerFactory.mergeRunners( + Execs.directExecutor(), + () -> StreamSupport.stream(segments.spliterator(), false) + .map(segmentMapFn) + .map(queryRunnerFactory::createRunner).iterator() + ); + + // Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where + // it is already supported. + return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest()) + .create(baseRunner) + .applyPreMergeDecoration() + .mergeResults() + .applyPostMergeDecoration() + .emitCPUTimeMetric(emitter, cpuAccumulator); + } + + @Override + public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) + { + // SegmentWranglers only work based on intervals and cannot run with specific segments. + throw new ISE("Cannot run with specific segments"); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/InlineSegmentWranglerTest.java b/server/src/test/java/org/apache/druid/segment/InlineSegmentWranglerTest.java new file mode 100644 index 00000000000..b61dc37d645 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/InlineSegmentWranglerTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.ValueType; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; + +public class InlineSegmentWranglerTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final InlineSegmentWrangler factory = new InlineSegmentWrangler(); + + private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable( + ImmutableList.of("str", "long"), + ImmutableList.of(ValueType.STRING, ValueType.LONG), + ImmutableList.of( + new Object[]{"foo", 1L}, + new Object[]{"bar", 2L} + ) + ); + + @Test + public void test_getSegmentsForIntervals_nonInline() + { + expectedException.expect(ClassCastException.class); + expectedException.expectMessage("TableDataSource cannot be cast"); + + final Iterable ignored = factory.getSegmentsForIntervals( + new TableDataSource("foo"), + Intervals.ONLY_ETERNITY + ); + } + + @Test + public void test_getSegmentsForIntervals_inline() + { + final List segments = ImmutableList.copyOf( + factory.getSegmentsForIntervals( + inlineDataSource, + Intervals.ONLY_ETERNITY + ) + ); + + Assert.assertEquals(1, segments.size()); + + final Segment segment = Iterables.getOnlyElement(segments); + Assert.assertThat(segment, CoreMatchers.instanceOf(RowBasedSegment.class)); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/LookupSegmentWranglerTest.java b/server/src/test/java/org/apache/druid/segment/LookupSegmentWranglerTest.java new file mode 100644 index 00000000000..228b17c4b67 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/LookupSegmentWranglerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.apache.druid.query.lookup.LookupSegment; +import org.apache.druid.query.lookup.LookupSegmentTest; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class LookupSegmentWranglerTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final LookupSegmentWrangler factory = new LookupSegmentWrangler( + new LookupExtractorFactoryContainerProvider() + { + @Override + public Set getAllLookupNames() + { + return ImmutableSet.of(LookupSegmentTest.LOOKUP_NAME); + } + + @Override + public Optional get(final String lookupName) + { + if (LookupSegmentTest.LOOKUP_NAME.equals(lookupName)) { + return Optional.of( + new LookupExtractorFactoryContainer( + "v0", + LookupSegmentTest.LOOKUP_EXTRACTOR_FACTORY + ) + ); + } else { + return Optional.empty(); + } + } + } + ); + + @Test + public void test_getSegmentsForIntervals_nonLookup() + { + expectedException.expect(ClassCastException.class); + expectedException.expectMessage("TableDataSource cannot be cast"); + + final Iterable ignored = factory.getSegmentsForIntervals( + new TableDataSource("foo"), + Intervals.ONLY_ETERNITY + ); + } + + @Test + public void test_getSegmentsForIntervals_lookupThatExists() + { + final List segments = ImmutableList.copyOf( + factory.getSegmentsForIntervals( + new LookupDataSource(LookupSegmentTest.LOOKUP_NAME), + Intervals.ONLY_ETERNITY + ) + ); + + Assert.assertEquals(1, segments.size()); + Assert.assertThat(Iterables.getOnlyElement(segments), CoreMatchers.instanceOf(LookupSegment.class)); + } + + @Test + public void test_getSegmentsForIntervals_lookupThatDoesNotExist() + { + final List segments = ImmutableList.copyOf( + factory.getSegmentsForIntervals( + new LookupDataSource("nonexistent"), + Intervals.ONLY_ETERNITY + ) + ); + + Assert.assertEquals(0, segments.size()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index aaf46b95d5d..5badcb0e0e3 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -38,11 +38,13 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.Jerseys; +import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; +import org.apache.druid.guice.SegmentWranglerModule; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.RetryQueryRunnerConfig; @@ -62,8 +64,6 @@ import org.eclipse.jetty.server.Server; import java.util.List; -/** - */ @Command( name = "broker", description = "Runs a broker node, see https://druid.apache.org/docs/latest/Broker.html for a description" @@ -84,6 +84,8 @@ public class CliBroker extends ServerRunnable new DruidProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), + new SegmentWranglerModule(), + new JoinableFactoryModule(), binder -> { binder.bindConstant().annotatedWith(Names.named("serviceName")).to( TieredBrokerConfig.DEFAULT_BROKER_SERVICE_NAME diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 749d139ff4b..7e03eaff61d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -54,6 +54,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; +import org.apache.druid.segment.MapSegmentWrangler; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; @@ -65,6 +66,7 @@ import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactoryTest; import org.apache.druid.server.ClientQuerySegmentWalker; +import org.apache.druid.server.LocalQuerySegmentWalker; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -119,6 +121,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C @Nullable final QueryScheduler scheduler ) { + final NoopServiceEmitter emitter = new NoopServiceEmitter(); + this.conglomerate = conglomerate; this.joinableFactory = joinableFactory == null ? MapJoinableFactoryTest.fromMap( @@ -130,8 +134,14 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C this.scheduler = scheduler; this.walker = new ClientQuerySegmentWalker( - new NoopServiceEmitter(), + emitter, new DataServerLikeWalker(), + new LocalQuerySegmentWalker( + conglomerate, + new MapSegmentWrangler(ImmutableMap.of()), + this.joinableFactory, + emitter + ), new QueryToolChestWarehouse() { @Override