Link up row-based datasources to serving layer. (#9503)

* Link up row-based datasources to serving layer.

- Add SegmentWrangler interface that allows linking of DataSources to Segments.
- Add LocalQuerySegmentWalker that uses SegmentWranglers to compute queries on
  data that is available locally.
- Modify ClientQuerySegmentWalker to use LocalQuerySegmentWalker when the base
  datasource is concrete and not a table.
- Add SegmentWranglerModule to the Broker so it has them available and can
  properly instantiate . LocalQuerySegmentWalkers.
- Set InlineDataSource and LookupDataSource to concrete, since they can be
  directly queried now.

* Fix tests.
This commit is contained in:
Gian Merlino 2020-03-11 11:32:27 -07:00 committed by GitHub
parent e9888f41cb
commit 2ef5c17441
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 774 additions and 60 deletions

View File

@ -375,7 +375,14 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
{
}
},
baseClient, warehouse, retryConfig, jsonMapper, serverConfig, null, new CacheConfig()
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
retryConfig,
jsonMapper,
serverConfig,
null,
new CacheConfig()
);
defineMocks();

View File

@ -1699,5 +1699,85 @@
}
}
]
},
{
"description": "groupBy on lookup",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "lookup",
"lookup": "wiki-simple"
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimensions": ["k", "v", "nonexistent"],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"k": "Wikipedia:Vandalismusmeldung",
"v": "lookup!",
"nonexistent": null,
"rows": 1
}
}
]
},
{
"description": "groupBy on inline",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "inline",
"columnNames": ["k", "v"],
"columnTypes": ["string", "string"],
"rows": [
["Wikipedia:Vandalismusmeldung", "inline!"]
]
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimensions": ["k", "v", "nonexistent"],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"k": "Wikipedia:Vandalismusmeldung",
"v": "inline!",
"nonexistent": null,
"rows": 1
}
}
]
}
]

View File

@ -70,13 +70,18 @@ public class FluentQueryRunnerBuilder<T>
}
public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter)
{
return emitCPUTimeMetric(emitter, new AtomicLong(0L));
}
public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter, AtomicLong accumulator)
{
return from(
CPUTimeMetricQueryRunner.safeBuild(
baseRunner,
toolChest,
emitter,
new AtomicLong(0L),
accumulator,
true
)
);

View File

@ -164,7 +164,7 @@ public class InlineDataSource implements DataSource
@Override
public boolean isConcrete()
{
return false;
return true;
}
public Map<String, ValueType> getRowSignature()
@ -172,7 +172,10 @@ public class InlineDataSource implements DataSource
final ImmutableMap.Builder<String, ValueType> retVal = ImmutableMap.builder();
for (int i = 0; i < columnNames.size(); i++) {
retVal.put(columnNames.get(i), columnTypes.get(i));
final ValueType columnType = columnTypes.get(i);
if (columnType != null) {
retVal.put(columnNames.get(i), columnType);
}
}
return retVal.build();

View File

@ -93,7 +93,7 @@ public class LookupDataSource implements DataSource
@Override
public boolean isConcrete()
{
return false;
return true;
}
@Override

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.query.DataSource;
import org.joda.time.Interval;
/**
* Utility for creating {@link Segment} objects for concrete datasources.
*
* @see org.apache.druid.guice.DruidBinders#segmentWranglerBinder to register factories
*/
public interface SegmentWrangler
{
/**
* Gets Segments for a particular datasource and set of intervals. These are expected to exist for any datasource
* where {@link DataSource#isConcrete} and {@link DataSource#isGlobal} are both true (corresponding to datasources
* where any Druid server could scan its data).
*
* Note: there are no SegmentWranglers for 'table' datasources (Druid's distributed datasources) because those are
* special and handled in their own special way.
*
* @return Segments that, collectively, contain data for dataSource. May be empty if dataSource does not exist or
* has no data in the provided intervals. May contain data outside the provided intervals, so callers should
* filter it down further, e.g. through the "interval" parameter of {@link StorageAdapter#makeCursors}
*/
Iterable<Segment> getSegmentsForIntervals(DataSource dataSource, Iterable<Interval> intervals);
}

View File

@ -25,6 +25,8 @@ import java.util.Optional;
/**
* Utility for creating {@link Joinable} objects.
*
* @see org.apache.druid.guice.DruidBinders#joinableFactoryBinder to register factories
*/
public interface JoinableFactory
{

View File

@ -139,7 +139,7 @@ public class InlineDataSourceTest
@Test
public void test_isConcrete()
{
Assert.assertFalse(listDataSource.isConcrete());
Assert.assertTrue(listDataSource.isConcrete());
}
@Test

View File

@ -64,7 +64,7 @@ public class LookupDataSourceTest
@Test
public void test_isConcrete()
{
Assert.assertFalse(lookylooDataSource.isConcrete());
Assert.assertTrue(lookylooDataSource.isConcrete());
}
@Test

View File

@ -136,7 +136,7 @@ public class DataSourceAnalysisTest
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
@ -153,7 +153,7 @@ public class DataSourceAnalysisTest
final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
@ -172,7 +172,7 @@ public class DataSourceAnalysisTest
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
@ -378,7 +378,7 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());

View File

@ -28,18 +28,17 @@ import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.DruidNode;
/**
*/
public class DruidBinders
{
public static MapBinder<Class<? extends Query>, QueryRunnerFactory> queryRunnerFactoryBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends Query>>(){},
new TypeLiteral<Class<? extends Query>>() {},
TypeLiteral.get(QueryRunnerFactory.class)
);
}
@ -48,19 +47,28 @@ public class DruidBinders
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends Query>>(){},
new TypeLiteral<QueryToolChest>(){}
new TypeLiteral<Class<? extends Query>>() {},
new TypeLiteral<QueryToolChest>() {}
);
}
public static Multibinder<KeyHolder<DruidNode>> discoveryAnnouncementBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<KeyHolder<DruidNode>>(){});
return Multibinder.newSetBinder(binder, new TypeLiteral<KeyHolder<DruidNode>>() {});
}
public static Multibinder<Class<? extends Monitor>> metricMonitorBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<Class<? extends Monitor>>(){});
return Multibinder.newSetBinder(binder, new TypeLiteral<Class<? extends Monitor>>() {});
}
public static MapBinder<Class<? extends DataSource>, SegmentWrangler> segmentWranglerBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends DataSource>>() {},
new TypeLiteral<SegmentWrangler>() {}
);
}
public static MapBinder<Class<? extends DataSource>, JoinableFactory> joinableFactoryBinder(Binder binder)

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.LookupSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.SegmentWrangler;
import java.util.Map;
/**
* Module that installs DataSource-class-specific {@link SegmentWrangler} implementations.
*/
public class SegmentWranglerModule implements Module
{
/**
* Default mappings of datasources to factories.
*/
@VisibleForTesting
static final Map<Class<? extends DataSource>, Class<? extends SegmentWrangler>> WRANGLER_MAPPINGS =
ImmutableMap.of(
InlineDataSource.class, InlineSegmentWrangler.class,
LookupDataSource.class, LookupSegmentWrangler.class
);
@Override
public void configure(Binder binder)
{
final MapBinder<Class<? extends DataSource>, SegmentWrangler> segmentWranglers =
DruidBinders.segmentWranglerBinder(binder);
WRANGLER_MAPPINGS.forEach((ds, wrangler) -> {
segmentWranglers.addBinding(ds).to(wrangler);
binder.bind(wrangler).in(LazySingleton.class);
});
binder.bind(SegmentWrangler.class).to(MapSegmentWrangler.class)
.in(Scopes.SINGLETON);
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import java.util.Collections;
/**
* A {@link JoinableFactory} for {@link InlineDataSource}.
*
* It is not valid to pass any other DataSource type to the "getSegmentsForIntervals" method.
*/
public class InlineSegmentWrangler implements SegmentWrangler
{
private static final String SEGMENT_ID = "inline";
@Override
public Iterable<Segment> getSegmentsForIntervals(final DataSource dataSource, final Iterable<Interval> intervals)
{
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
return Collections.singletonList(
new RowBasedSegment<>(
SegmentId.dummy(SEGMENT_ID),
inlineDataSource.getRows(),
inlineDataSource.rowAdapter(),
inlineDataSource.getRowSignature()
)
);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.inject.Inject;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.LookupSegment;
import org.apache.druid.segment.join.JoinableFactory;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.Optional;
/**
* A {@link JoinableFactory} for {@link LookupDataSource}.
*
* It is not valid to pass any other DataSource type to the "getSegmentsForIntervals" method.
*/
public class LookupSegmentWrangler implements SegmentWrangler
{
private final LookupExtractorFactoryContainerProvider lookupProvider;
@Inject
public LookupSegmentWrangler(final LookupExtractorFactoryContainerProvider lookupProvider)
{
this.lookupProvider = lookupProvider;
}
@Override
public Iterable<Segment> getSegmentsForIntervals(final DataSource dataSource, final Iterable<Interval> intervals)
{
final LookupDataSource lookupDataSource = (LookupDataSource) dataSource;
final Optional<LookupExtractorFactoryContainer> maybeContainer =
lookupProvider.get(lookupDataSource.getLookupName());
return maybeContainer.map(
container ->
Collections.<Segment>singletonList(
new LookupSegment(
lookupDataSource.getLookupName(),
container.getLookupExtractorFactory()
)
)
).orElse(Collections.emptyList());
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource;
import org.joda.time.Interval;
import java.util.Map;
/**
* An implementation of {@link SegmentWrangler} that allows registration of DataSource-specific handlers via Guice.
*
* @see org.apache.druid.guice.DruidBinders#segmentWranglerBinder to register wranglers
*/
public class MapSegmentWrangler implements SegmentWrangler
{
private final Map<Class<? extends DataSource>, SegmentWrangler> wranglers;
@Inject
public MapSegmentWrangler(final Map<Class<? extends DataSource>, SegmentWrangler> wranglers)
{
this.wranglers = wranglers;
}
@Override
public Iterable<Segment> getSegmentsForIntervals(final DataSource dataSource, final Iterable<Interval> intervals)
{
final SegmentWrangler wrangler = wranglers.get(dataSource.getClass());
if (wrangler != null) {
return wrangler.getSegmentsForIntervals(dataSource, intervals);
} else {
// Reasonable as a user-facing error message.
throw new ISE("Cannot read directly out of dataSource: %s", dataSource);
}
}
}

View File

@ -48,7 +48,8 @@ import org.joda.time.Interval;
public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
private final ServiceEmitter emitter;
private final QuerySegmentWalker baseClient;
private final QuerySegmentWalker clusterClient;
private final QuerySegmentWalker localClient;
private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
@ -58,7 +59,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
public ClientQuerySegmentWalker(
ServiceEmitter emitter,
QuerySegmentWalker baseClient,
QuerySegmentWalker clusterClient,
QuerySegmentWalker localClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
@ -68,7 +70,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
)
{
this.emitter = emitter;
this.baseClient = baseClient;
this.clusterClient = clusterClient;
this.localClient = localClient;
this.warehouse = warehouse;
this.retryConfig = retryConfig;
this.objectMapper = objectMapper;
@ -80,7 +83,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
@Inject
ClientQuerySegmentWalker(
ServiceEmitter emitter,
CachingClusteredClient baseClient,
CachingClusteredClient clusterClient,
LocalQuerySegmentWalker localClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
@ -91,7 +95,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
this(
emitter,
(QuerySegmentWalker) baseClient,
(QuerySegmentWalker) clusterClient,
(QuerySegmentWalker) localClient,
warehouse,
retryConfig,
objectMapper,
@ -107,7 +112,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (analysis.isConcreteTableBased()) {
return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals));
return decorateClusterRunner(query, clusterClient.getQueryRunnerForIntervals(query, intervals));
} else if (analysis.isConcreteBased() && analysis.isGlobal()) {
// Concrete, non-table based, can run locally. No need to decorate since LocalQuerySegmentWalker does its own.
return localClient.getQueryRunnerForIntervals(query, intervals);
} else {
// In the future, we will check here to see if parts of the query are inlinable, and if that inlining would
// be able to create a concrete table-based query that we can run through the distributed query stack.
@ -121,19 +129,42 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (analysis.isConcreteTableBased()) {
return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs));
return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs));
} else {
throw new ISE("Query dataSource is not table-based, cannot run");
}
}
private <T> QueryRunner<T> makeRunner(Query<T> query, QueryRunner<T> baseClientRunner)
private <T> QueryRunner<T> decorateClusterRunner(Query<T> query, QueryRunner<T> baseClusterRunner)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
query.<String>getContextValue("postProcessing"),
new TypeReference<PostProcessingOperator<T>>() {}
);
final QueryRunner<T> mostlyDecoratedRunner =
new FluentQueryRunnerBuilder<>(toolChest)
.create(
new SetAndVerifyContextQueryRunner<>(
serverConfig,
new RetryQueryRunner<>(
baseClusterRunner,
retryConfig,
objectMapper
)
)
)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter)
.postProcess(postProcessing);
// This does not adhere to the fluent workflow. See https://github.com/apache/druid/issues/5517
return new ResultLevelCachingQueryRunner<>(
makeRunner(query, baseClientRunner, toolChest),
mostlyDecoratedRunner,
toolChest,
query,
objectMapper,
@ -141,33 +172,4 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
cacheConfig
);
}
private <T> QueryRunner<T> makeRunner(
Query<T> query,
QueryRunner<T> baseClientRunner,
QueryToolChest<T, Query<T>> toolChest
)
{
PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
query.<String>getContextValue("postProcessing"),
new TypeReference<PostProcessingOperator<T>>() {}
);
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new SetAndVerifyContextQueryRunner<>(
serverConfig,
new RetryQueryRunner<>(
baseClientRunner,
retryConfig,
objectMapper
)
)
)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter)
.postProcess(postProcessing);
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.joda.time.Interval;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
/**
* Processor that computes Druid queries, single-threaded.
*
* The datasource for the query must satisfy {@link DataSourceAnalysis#isConcreteBased()} and
* {@link DataSourceAnalysis#isGlobal()}. Its base datasource must also be handleable by the provided
* {@link SegmentWrangler}.
*/
public class LocalQuerySegmentWalker implements QuerySegmentWalker
{
private final QueryRunnerFactoryConglomerate conglomerate;
private final SegmentWrangler segmentWrangler;
private final JoinableFactory joinableFactory;
private final ServiceEmitter emitter;
@Inject
public LocalQuerySegmentWalker(
QueryRunnerFactoryConglomerate conglomerate,
SegmentWrangler segmentWrangler,
JoinableFactory joinableFactory,
ServiceEmitter emitter
)
{
this.conglomerate = conglomerate;
this.segmentWrangler = segmentWrangler;
this.joinableFactory = joinableFactory;
this.emitter = emitter;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteBased() || !analysis.isGlobal()) {
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
}
final AtomicLong cpuAccumulator = new AtomicLong(0L);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
final Iterable<Segment> segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals);
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query)
);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
() -> StreamSupport.stream(segments.spliterator(), false)
.map(segmentMapFn)
.map(queryRunnerFactory::createRunner).iterator()
);
// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
.create(baseRunner)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// SegmentWranglers only work based on intervals and cannot run with specific segments.
throw new ISE("Cannot run with specific segments");
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.ValueType;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
public class InlineSegmentWranglerTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final InlineSegmentWrangler factory = new InlineSegmentWrangler();
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of("str", "long"),
ImmutableList.of(ValueType.STRING, ValueType.LONG),
ImmutableList.of(
new Object[]{"foo", 1L},
new Object[]{"bar", 2L}
)
);
@Test
public void test_getSegmentsForIntervals_nonInline()
{
expectedException.expect(ClassCastException.class);
expectedException.expectMessage("TableDataSource cannot be cast");
final Iterable<Segment> ignored = factory.getSegmentsForIntervals(
new TableDataSource("foo"),
Intervals.ONLY_ETERNITY
);
}
@Test
public void test_getSegmentsForIntervals_inline()
{
final List<Segment> segments = ImmutableList.copyOf(
factory.getSegmentsForIntervals(
inlineDataSource,
Intervals.ONLY_ETERNITY
)
);
Assert.assertEquals(1, segments.size());
final Segment segment = Iterables.getOnlyElement(segments);
Assert.assertThat(segment, CoreMatchers.instanceOf(RowBasedSegment.class));
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.LookupSegment;
import org.apache.druid.query.lookup.LookupSegmentTest;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class LookupSegmentWranglerTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final LookupSegmentWrangler factory = new LookupSegmentWrangler(
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return ImmutableSet.of(LookupSegmentTest.LOOKUP_NAME);
}
@Override
public Optional<LookupExtractorFactoryContainer> get(final String lookupName)
{
if (LookupSegmentTest.LOOKUP_NAME.equals(lookupName)) {
return Optional.of(
new LookupExtractorFactoryContainer(
"v0",
LookupSegmentTest.LOOKUP_EXTRACTOR_FACTORY
)
);
} else {
return Optional.empty();
}
}
}
);
@Test
public void test_getSegmentsForIntervals_nonLookup()
{
expectedException.expect(ClassCastException.class);
expectedException.expectMessage("TableDataSource cannot be cast");
final Iterable<Segment> ignored = factory.getSegmentsForIntervals(
new TableDataSource("foo"),
Intervals.ONLY_ETERNITY
);
}
@Test
public void test_getSegmentsForIntervals_lookupThatExists()
{
final List<Segment> segments = ImmutableList.copyOf(
factory.getSegmentsForIntervals(
new LookupDataSource(LookupSegmentTest.LOOKUP_NAME),
Intervals.ONLY_ETERNITY
)
);
Assert.assertEquals(1, segments.size());
Assert.assertThat(Iterables.getOnlyElement(segments), CoreMatchers.instanceOf(LookupSegment.class));
}
@Test
public void test_getSegmentsForIntervals_lookupThatDoesNotExist()
{
final List<Segment> segments = ImmutableList.copyOf(
factory.getSegmentsForIntervals(
new LookupDataSource("nonexistent"),
Intervals.ONLY_ETERNITY
)
);
Assert.assertEquals(0, segments.size());
}
}

View File

@ -38,11 +38,13 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.RetryQueryRunnerConfig;
@ -62,8 +64,6 @@ import org.eclipse.jetty.server.Server;
import java.util.List;
/**
*/
@Command(
name = "broker",
description = "Runs a broker node, see https://druid.apache.org/docs/latest/Broker.html for a description"
@ -84,6 +84,8 @@ public class CliBroker extends ServerRunnable
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new SegmentWranglerModule(),
new JoinableFactoryModule(),
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to(
TieredBrokerConfig.DEFAULT_BROKER_SERVICE_NAME

View File

@ -54,6 +54,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
@ -65,6 +66,7 @@ import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.LocalQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -119,6 +121,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
@Nullable final QueryScheduler scheduler
)
{
final NoopServiceEmitter emitter = new NoopServiceEmitter();
this.conglomerate = conglomerate;
this.joinableFactory = joinableFactory == null ?
MapJoinableFactoryTest.fromMap(
@ -130,8 +134,14 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
this.scheduler = scheduler;
this.walker = new ClientQuerySegmentWalker(
new NoopServiceEmitter(),
emitter,
new DataServerLikeWalker(),
new LocalQuerySegmentWalker(
conglomerate,
new MapSegmentWrangler(ImmutableMap.of()),
this.joinableFactory,
emitter
),
new QueryToolChestWarehouse()
{
@Override