mirror of https://github.com/apache/druid.git
global table only if joinable (#10041)
* global table if only joinable * oops * fix style, add more tests * Update sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java * better information schema columns, distinguish broadcast from joinable * fix javadoc * fix mistake Co-authored-by: Jihoon Son <jihoonson@apache.org>
This commit is contained in:
parent
a4bd144ebe
commit
b5e6569d2c
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.druid.query.movingaverage.test.TestConfig;
|
|||
import org.apache.druid.query.planning.DataSourceAnalysis;
|
||||
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||
import org.apache.druid.query.timeseries.TimeseriesResultValue;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.server.ClientQuerySegmentWalker;
|
||||
import org.apache.druid.server.QueryStackTests;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
|
@ -377,6 +379,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
|
|||
baseClient,
|
||||
null /* local client; unused in this test, so pass in null */,
|
||||
warehouse,
|
||||
new MapJoinableFactory(ImmutableMap.of()),
|
||||
retryConfig,
|
||||
jsonMapper,
|
||||
serverConfig,
|
||||
|
|
|
@ -71,6 +71,15 @@ public interface DataSource
|
|||
/**
|
||||
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
|
||||
* for queries of those.
|
||||
*
|
||||
* Currently this is coupled with joinability - if this returns true then the query engine expects there exists a
|
||||
* {@link org.apache.druid.segment.join.JoinableFactory} which might build a
|
||||
* {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is
|
||||
* required to join this datasource on the right hand side, then this value must be false for now.
|
||||
*
|
||||
* In the future, instead of directly using this method, the query planner and engine should consider
|
||||
* {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the
|
||||
* right hand side is directly joinable, which would allow decoupling this property from joins.
|
||||
*/
|
||||
boolean isGlobal();
|
||||
|
||||
|
|
|
@ -205,7 +205,7 @@ public class DataSourceAnalysis
|
|||
|
||||
/**
|
||||
* Returns true if all servers have the ability to compute this datasource. These datasources depend only on
|
||||
* globally broadcast data, like lookups or inline data.
|
||||
* globally broadcast data, like lookups or inline data or broadcast segments.
|
||||
*/
|
||||
public boolean isGlobal()
|
||||
{
|
||||
|
|
|
@ -30,6 +30,13 @@ import java.util.Optional;
|
|||
*/
|
||||
public interface JoinableFactory
|
||||
{
|
||||
/**
|
||||
* Returns true if a {@link Joinable} **may** be created for a given {@link DataSource}, but is not a guarantee that
|
||||
* {@link #build} will return a non-empty result. Successfully building a {@link Joinable} might require specific
|
||||
* criteria of the {@link JoinConditionAnalysis}.
|
||||
*/
|
||||
boolean isDirectlyJoinable(DataSource dataSource);
|
||||
|
||||
/**
|
||||
* Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc.
|
||||
*
|
||||
|
|
|
@ -43,6 +43,17 @@ public class MapJoinableFactory implements JoinableFactory
|
|||
this.joinableFactories = new IdentityHashMap<>(joinableFactories);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
JoinableFactory factory = joinableFactories.get(dataSource.getClass());
|
||||
if (factory == null) {
|
||||
return false;
|
||||
} else {
|
||||
return factory.isDirectlyJoinable(dataSource);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
|
||||
{
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.LookupDataSource;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.extraction.MapLookupExtractor;
|
||||
|
@ -155,13 +156,24 @@ public class JoinablesTest
|
|||
|
||||
final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
|
||||
ImmutableList.of(clause),
|
||||
(dataSource, condition) -> {
|
||||
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
|
||||
return Optional.of(
|
||||
LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
|
||||
);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
new JoinableFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
return dataSource.equals(lookupDataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
|
||||
{
|
||||
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
|
||||
return Optional.of(
|
||||
LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
|
||||
);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
},
|
||||
new AtomicLong(),
|
||||
|
|
|
@ -65,6 +65,8 @@ public class MapJoinableFactoryTest
|
|||
target = new MapJoinableFactory(
|
||||
ImmutableMap.of(NoopDataSource.class, noopJoinableFactory));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testBuildDataSourceNotRegisteredShouldReturnAbsent()
|
||||
{
|
||||
|
@ -89,4 +91,18 @@ public class MapJoinableFactoryTest
|
|||
Optional<Joinable> joinable = target.build(noopDataSource, condition);
|
||||
Assert.assertEquals(mockJoinable, joinable.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDirectShouldBeFalseForNotRegistered()
|
||||
{
|
||||
Assert.assertFalse(target.isDirectlyJoinable(inlineDataSource));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDirectlyJoinableShouldBeTrueForRegisteredThatIsJoinable()
|
||||
{
|
||||
EasyMock.expect(noopJoinableFactory.isDirectlyJoinable(noopDataSource)).andReturn(true).anyTimes();
|
||||
EasyMock.replay(noopJoinableFactory);
|
||||
Assert.assertTrue(target.isDirectlyJoinable(noopDataSource));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,12 @@ public class NoopJoinableFactory implements JoinableFactory
|
|||
// Singleton.
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
|
||||
{
|
||||
|
|
|
@ -36,6 +36,15 @@ import java.util.Set;
|
|||
*/
|
||||
public class InlineJoinableFactory implements JoinableFactory
|
||||
{
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
// this should always be true if this is access through MapJoinableFactory, but check just in case...
|
||||
// further, this should not ever be legitimately called, because this method is used to avoid subquery joins
|
||||
// which use the InlineJoinableFactory
|
||||
return dataSource instanceof InlineDataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
|
||||
{
|
||||
|
|
|
@ -42,6 +42,13 @@ public class LookupJoinableFactory implements JoinableFactory
|
|||
this.lookupProvider = lookupProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
// this should always be true if this is access through MapJoinableFactory, but check just in case...
|
||||
return dataSource instanceof LookupDataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
|
||||
{
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
|
|||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.FluentQueryRunnerBuilder;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
import org.apache.druid.query.PostProcessingOperator;
|
||||
import org.apache.druid.query.Query;
|
||||
|
@ -47,9 +48,11 @@ import org.apache.druid.query.ResultLevelCachingQueryRunner;
|
|||
import org.apache.druid.query.RetryQueryRunner;
|
||||
import org.apache.druid.query.RetryQueryRunnerConfig;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.query.TableDataSource;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.planning.DataSourceAnalysis;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -77,6 +80,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
private final QuerySegmentWalker clusterClient;
|
||||
private final QuerySegmentWalker localClient;
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final JoinableFactory joinableFactory;
|
||||
private final RetryQueryRunnerConfig retryConfig;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ServerConfig serverConfig;
|
||||
|
@ -88,6 +92,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
QuerySegmentWalker clusterClient,
|
||||
QuerySegmentWalker localClient,
|
||||
QueryToolChestWarehouse warehouse,
|
||||
JoinableFactory joinableFactory,
|
||||
RetryQueryRunnerConfig retryConfig,
|
||||
ObjectMapper objectMapper,
|
||||
ServerConfig serverConfig,
|
||||
|
@ -99,6 +104,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
this.clusterClient = clusterClient;
|
||||
this.localClient = localClient;
|
||||
this.warehouse = warehouse;
|
||||
this.joinableFactory = joinableFactory;
|
||||
this.retryConfig = retryConfig;
|
||||
this.objectMapper = objectMapper;
|
||||
this.serverConfig = serverConfig;
|
||||
|
@ -112,6 +118,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
CachingClusteredClient clusterClient,
|
||||
LocalQuerySegmentWalker localClient,
|
||||
QueryToolChestWarehouse warehouse,
|
||||
JoinableFactory joinableFactory,
|
||||
RetryQueryRunnerConfig retryConfig,
|
||||
ObjectMapper objectMapper,
|
||||
ServerConfig serverConfig,
|
||||
|
@ -124,6 +131,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
(QuerySegmentWalker) clusterClient,
|
||||
(QuerySegmentWalker) localClient,
|
||||
warehouse,
|
||||
joinableFactory,
|
||||
retryConfig,
|
||||
objectMapper,
|
||||
serverConfig,
|
||||
|
@ -137,10 +145,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
{
|
||||
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
||||
|
||||
// First, do an inlining dry run to see if any inlining is necessary, without actually running the queries.
|
||||
// transform TableDataSource to GlobalTableDataSource when eligible
|
||||
// before further transformation to potentially inline
|
||||
final DataSource freeTradeDataSource = globalizeIfPossible(query.getDataSource());
|
||||
// do an inlining dry run to see if any inlining is necessary, without actually running the queries.
|
||||
final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows());
|
||||
final DataSource inlineDryRun = inlineIfNecessary(
|
||||
query.getDataSource(),
|
||||
freeTradeDataSource,
|
||||
toolChest,
|
||||
new AtomicInteger(),
|
||||
maxSubqueryRows,
|
||||
|
@ -156,7 +167,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
// Now that we know the structure is workable, actually do the inlining (if necessary).
|
||||
final Query<T> newQuery = query.withDataSource(
|
||||
inlineIfNecessary(
|
||||
query.getDataSource(),
|
||||
freeTradeDataSource,
|
||||
toolChest,
|
||||
new AtomicInteger(),
|
||||
maxSubqueryRows,
|
||||
|
@ -187,10 +198,15 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
{
|
||||
// Inlining isn't done for segments-based queries.
|
||||
// Inlining isn't done for segments-based queries, but we still globalify the table datasources if possible
|
||||
final Query<T> freeTradeQuery = query.withDataSource(globalizeIfPossible(query.getDataSource()));
|
||||
|
||||
if (canRunQueryUsingClusterWalker(query)) {
|
||||
return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs));
|
||||
return new QuerySwappingQueryRunner<>(
|
||||
decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(freeTradeQuery, specs)),
|
||||
query,
|
||||
freeTradeQuery
|
||||
);
|
||||
} else {
|
||||
// We don't expect end-users to see this message, since it only happens when specific segments are requested;
|
||||
// this is not typical end-user behavior.
|
||||
|
@ -235,6 +251,27 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
|
||||
}
|
||||
|
||||
|
||||
private DataSource globalizeIfPossible(
|
||||
final DataSource dataSource
|
||||
)
|
||||
{
|
||||
if (dataSource instanceof TableDataSource) {
|
||||
GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(((TableDataSource) dataSource).getName());
|
||||
if (joinableFactory.isDirectlyJoinable(maybeGlobal)) {
|
||||
return maybeGlobal;
|
||||
}
|
||||
return dataSource;
|
||||
} else {
|
||||
List<DataSource> currentChildren = dataSource.getChildren();
|
||||
List<DataSource> newChildren = new ArrayList<>(currentChildren.size());
|
||||
for (DataSource child : currentChildren) {
|
||||
newChildren.add(globalizeIfPossible(child));
|
||||
}
|
||||
return dataSource.withChildren(newChildren);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as:
|
||||
*
|
||||
|
|
|
@ -80,6 +80,13 @@ public class InlineJoinableFactoryTest
|
|||
Assert.assertEquals(3, joinable.getCardinality("long"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDirectlyJoinable()
|
||||
{
|
||||
Assert.assertTrue(factory.isDirectlyJoinable(inlineDataSource));
|
||||
Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
|
||||
}
|
||||
|
||||
private static JoinConditionAnalysis makeCondition(final String condition)
|
||||
{
|
||||
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
|
||||
|
|
|
@ -125,6 +125,13 @@ public class LookupJoinableFactoryTest
|
|||
Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDirectlyJoinable()
|
||||
{
|
||||
Assert.assertTrue(factory.isDirectlyJoinable(lookupDataSource));
|
||||
Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
|
||||
}
|
||||
|
||||
private static JoinConditionAnalysis makeCondition(final String condition)
|
||||
{
|
||||
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.druid.math.expr.ExprMacroTable;
|
|||
import org.apache.druid.query.BaseQuery;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
import org.apache.druid.query.JoinDataSource;
|
||||
import org.apache.druid.query.Query;
|
||||
|
@ -70,7 +71,9 @@ import org.apache.druid.segment.TestHelper;
|
|||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.join.InlineJoinableFactory;
|
||||
import org.apache.druid.segment.join.JoinConditionAnalysis;
|
||||
import org.apache.druid.segment.join.JoinType;
|
||||
import org.apache.druid.segment.join.Joinable;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
|
@ -96,6 +99,7 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Tests ClientQuerySegmentWalker.
|
||||
|
@ -112,6 +116,7 @@ public class ClientQuerySegmentWalkerTest
|
|||
private static final String FOO = "foo";
|
||||
private static final String BAR = "bar";
|
||||
private static final String MULTI = "multi";
|
||||
private static final String GLOBAL = "broadcast";
|
||||
|
||||
private static final Interval INTERVAL = Intervals.of("2000/P1Y");
|
||||
private static final String VERSION = "A";
|
||||
|
@ -218,6 +223,40 @@ public class ClientQuerySegmentWalkerTest
|
|||
Assert.assertEquals(1, scheduler.getTotalReleased().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesOnAutomaticGlobalTable()
|
||||
{
|
||||
final TimeseriesQuery query =
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(GLOBAL)
|
||||
.granularity(Granularities.ALL)
|
||||
.intervals(Collections.singletonList(INTERVAL))
|
||||
.aggregators(new LongSumAggregatorFactory("sum", "n"))
|
||||
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
|
||||
.build();
|
||||
|
||||
// expect global/joinable datasource to be automatically translated into a GlobalTableDataSource
|
||||
final TimeseriesQuery expectedClusterQuery =
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(new GlobalTableDataSource(GLOBAL))
|
||||
.granularity(Granularities.ALL)
|
||||
.intervals(Collections.singletonList(INTERVAL))
|
||||
.aggregators(new LongSumAggregatorFactory("sum", "n"))
|
||||
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
|
||||
.build();
|
||||
|
||||
testQuery(
|
||||
query,
|
||||
ImmutableList.of(ExpectedQuery.cluster(expectedClusterQuery)),
|
||||
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, scheduler.getTotalRun().get());
|
||||
Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
|
||||
Assert.assertEquals(1, scheduler.getTotalAcquired().get());
|
||||
Assert.assertEquals(1, scheduler.getTotalReleased().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesOnInline()
|
||||
{
|
||||
|
@ -606,6 +645,20 @@ public class ClientQuerySegmentWalkerTest
|
|||
final JoinableFactory joinableFactory = new MapJoinableFactory(
|
||||
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
|
||||
.put(InlineDataSource.class, new InlineJoinableFactory())
|
||||
.put(GlobalTableDataSource.class, new JoinableFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
|
||||
{
|
||||
return Optional.empty();
|
||||
}
|
||||
})
|
||||
.build()
|
||||
);
|
||||
|
||||
|
@ -651,7 +704,8 @@ public class ClientQuerySegmentWalkerTest
|
|||
ImmutableMap.of(
|
||||
FOO, makeTimeline(FOO, FOO_INLINE),
|
||||
BAR, makeTimeline(BAR, BAR_INLINE),
|
||||
MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
|
||||
MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE),
|
||||
GLOBAL, makeTimeline(GLOBAL, FOO_INLINE)
|
||||
),
|
||||
joinableFactory,
|
||||
conglomerate,
|
||||
|
@ -669,6 +723,7 @@ public class ClientQuerySegmentWalkerTest
|
|||
ClusterOrLocal.LOCAL
|
||||
),
|
||||
conglomerate,
|
||||
joinableFactory,
|
||||
serverConfig
|
||||
);
|
||||
}
|
||||
|
|
|
@ -95,6 +95,7 @@ public class QueryStackTests
|
|||
final QuerySegmentWalker clusterWalker,
|
||||
final QuerySegmentWalker localWalker,
|
||||
final QueryRunnerFactoryConglomerate conglomerate,
|
||||
final JoinableFactory joinableFactory,
|
||||
final ServerConfig serverConfig
|
||||
)
|
||||
{
|
||||
|
@ -110,6 +111,7 @@ public class QueryStackTests
|
|||
return conglomerate.findFactory(query).getToolchest();
|
||||
}
|
||||
},
|
||||
joinableFactory,
|
||||
new RetryQueryRunnerConfig(),
|
||||
TestHelper.makeJsonMapper(),
|
||||
serverConfig,
|
||||
|
|
|
@ -337,6 +337,8 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
private static boolean computeRightRequiresSubquery(final DruidRel<?> right)
|
||||
{
|
||||
// Right requires a subquery unless it's a scan or mapping on top of a global datasource.
|
||||
// ideally this would involve JoinableFactory.isDirectlyJoinable to check that the global datasources
|
||||
// are in fact possibly joinable, but for now isGlobal is coupled to joinability
|
||||
return !(DruidRels.isScanOrMapping(right, false)
|
||||
&& DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent());
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
|
|||
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.server.QueryLifecycleFactory;
|
||||
import org.apache.druid.server.SegmentManager;
|
||||
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||
|
@ -104,6 +105,7 @@ public class DruidSchema extends AbstractSchema
|
|||
private final PlannerConfig config;
|
||||
private final SegmentManager segmentManager;
|
||||
private final ViewManager viewManager;
|
||||
private final JoinableFactory joinableFactory;
|
||||
private final ExecutorService cacheExec;
|
||||
private final ConcurrentMap<String, DruidTable> tables;
|
||||
|
||||
|
@ -148,6 +150,7 @@ public class DruidSchema extends AbstractSchema
|
|||
final QueryLifecycleFactory queryLifecycleFactory,
|
||||
final TimelineServerView serverView,
|
||||
final SegmentManager segmentManager,
|
||||
final JoinableFactory joinableFactory,
|
||||
final PlannerConfig config,
|
||||
final ViewManager viewManager,
|
||||
final Escalator escalator
|
||||
|
@ -156,6 +159,7 @@ public class DruidSchema extends AbstractSchema
|
|||
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
|
||||
Preconditions.checkNotNull(serverView, "serverView");
|
||||
this.segmentManager = segmentManager;
|
||||
this.joinableFactory = joinableFactory;
|
||||
this.config = Preconditions.checkNotNull(config, "config");
|
||||
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
|
||||
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
|
||||
|
@ -278,10 +282,11 @@ public class DruidSchema extends AbstractSchema
|
|||
for (String dataSource : dataSourcesToRebuild) {
|
||||
final DruidTable druidTable = buildDruidTable(dataSource);
|
||||
final DruidTable oldTable = tables.put(dataSource, druidTable);
|
||||
final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
|
||||
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
|
||||
log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
|
||||
log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature());
|
||||
} else {
|
||||
log.debug("dataSource [%s] signature is unchanged.", dataSource);
|
||||
log.debug("%s [%s] signature is unchanged.", description, dataSource);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -627,12 +632,21 @@ public class DruidSchema extends AbstractSchema
|
|||
columnTypes.forEach(builder::add);
|
||||
|
||||
final TableDataSource tableDataSource;
|
||||
if (segmentManager.getDataSourceNames().contains(dataSource)) {
|
||||
tableDataSource = new GlobalTableDataSource(dataSource);
|
||||
|
||||
// to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing
|
||||
// in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here)
|
||||
// to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources
|
||||
// at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource
|
||||
// if also joinable
|
||||
final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource);
|
||||
final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal);
|
||||
final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource);
|
||||
if (isBroadcast && isJoinable) {
|
||||
tableDataSource = maybeGlobal;
|
||||
} else {
|
||||
tableDataSource = new TableDataSource(dataSource);
|
||||
}
|
||||
return new DruidTable(tableDataSource, builder.build());
|
||||
return new DruidTable(tableDataSource, builder.build(), isJoinable, isBroadcast);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.druid.server.security.AuthorizationUtils;
|
|||
import org.apache.druid.server.security.AuthorizerMapper;
|
||||
import org.apache.druid.server.security.ResourceAction;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.druid.sql.calcite.table.DruidTable;
|
||||
import org.apache.druid.sql.calcite.table.RowSignatures;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -83,6 +84,8 @@ public class InformationSchema extends AbstractSchema
|
|||
.add("TABLE_SCHEMA", ValueType.STRING)
|
||||
.add("TABLE_NAME", ValueType.STRING)
|
||||
.add("TABLE_TYPE", ValueType.STRING)
|
||||
.add("IS_JOINABLE", ValueType.STRING)
|
||||
.add("IS_BROADCAST", ValueType.STRING)
|
||||
.build();
|
||||
private static final RowSignature COLUMNS_SIGNATURE = RowSignature
|
||||
.builder()
|
||||
|
@ -109,6 +112,9 @@ public class InformationSchema extends AbstractSchema
|
|||
return Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasourceName));
|
||||
};
|
||||
|
||||
private static final String INFO_TRUE = "YES";
|
||||
private static final String INFO_FALSE = "NO";
|
||||
|
||||
private final SchemaPlus rootSchema;
|
||||
private final Map<String, Table> tableMap;
|
||||
private final AuthorizerMapper authorizerMapper;
|
||||
|
@ -217,18 +223,27 @@ public class InformationSchema extends AbstractSchema
|
|||
return Iterables.filter(
|
||||
Iterables.concat(
|
||||
FluentIterable.from(authorizedTableNames).transform(
|
||||
new Function<String, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(final String tableName)
|
||||
{
|
||||
return new Object[]{
|
||||
CATALOG_NAME, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
tableName, // TABLE_NAME
|
||||
subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE
|
||||
};
|
||||
tableName -> {
|
||||
final Table table = subSchema.getTable(tableName);
|
||||
final boolean isJoinable;
|
||||
final boolean isBroadcast;
|
||||
if (table instanceof DruidTable) {
|
||||
DruidTable druidTable = (DruidTable) table;
|
||||
isJoinable = druidTable.isJoinable();
|
||||
isBroadcast = druidTable.isBroadcast();
|
||||
} else {
|
||||
isJoinable = false;
|
||||
isBroadcast = false;
|
||||
}
|
||||
|
||||
return new Object[]{
|
||||
CATALOG_NAME, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
tableName, // TABLE_NAME
|
||||
table.getJdbcTableType().toString(), // TABLE_TYPE
|
||||
isJoinable ? INFO_TRUE : INFO_FALSE, // IS_JOINABLE
|
||||
isBroadcast ? INFO_TRUE : INFO_FALSE // IS_BROADCAST
|
||||
};
|
||||
}
|
||||
),
|
||||
FluentIterable.from(authorizedFunctionNames).transform(
|
||||
|
@ -242,7 +257,9 @@ public class InformationSchema extends AbstractSchema
|
|||
CATALOG_NAME, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
functionName, // TABLE_NAME
|
||||
"VIEW" // TABLE_TYPE
|
||||
"VIEW", // TABLE_TYPE
|
||||
INFO_FALSE, // IS_JOINABLE
|
||||
INFO_FALSE // IS_BROADCAST
|
||||
};
|
||||
} else {
|
||||
return null;
|
||||
|
@ -406,7 +423,7 @@ public class InformationSchema extends AbstractSchema
|
|||
field.getName(), // COLUMN_NAME
|
||||
String.valueOf(field.getIndex()), // ORDINAL_POSITION
|
||||
"", // COLUMN_DEFAULT
|
||||
type.isNullable() ? "YES" : "NO", // IS_NULLABLE
|
||||
type.isNullable() ? INFO_TRUE : INFO_FALSE, // IS_NULLABLE
|
||||
type.getSqlTypeName().toString(), // DATA_TYPE
|
||||
null, // CHARACTER_MAXIMUM_LENGTH
|
||||
null, // CHARACTER_OCTET_LENGTH
|
||||
|
|
|
@ -57,7 +57,9 @@ public class LookupSchema extends AbstractSchema
|
|||
final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
|
||||
|
||||
for (final String lookupName : lookupProvider.getAllLookupNames()) {
|
||||
tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE));
|
||||
// all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast
|
||||
// (if we ignore lookup tiers...)
|
||||
tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, true, true));
|
||||
}
|
||||
|
||||
return tableMapBuilder.build();
|
||||
|
|
|
@ -41,14 +41,20 @@ public class DruidTable implements TranslatableTable
|
|||
{
|
||||
private final DataSource dataSource;
|
||||
private final RowSignature rowSignature;
|
||||
private final boolean joinable;
|
||||
private final boolean broadcast;
|
||||
|
||||
public DruidTable(
|
||||
final DataSource dataSource,
|
||||
final RowSignature rowSignature
|
||||
final RowSignature rowSignature,
|
||||
final boolean isJoinable,
|
||||
final boolean isBroadcast
|
||||
)
|
||||
{
|
||||
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
|
||||
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
|
||||
this.joinable = isJoinable;
|
||||
this.broadcast = isBroadcast;
|
||||
}
|
||||
|
||||
public DataSource getDataSource()
|
||||
|
@ -61,6 +67,16 @@ public class DruidTable implements TranslatableTable
|
|||
return rowSignature;
|
||||
}
|
||||
|
||||
public boolean isJoinable()
|
||||
{
|
||||
return joinable;
|
||||
}
|
||||
|
||||
public boolean isBroadcast()
|
||||
{
|
||||
return broadcast;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema.TableType getJdbcTableType()
|
||||
{
|
||||
|
|
|
@ -708,59 +708,59 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
public void testInformationSchemaTables() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
|
||||
"SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n"
|
||||
+ "FROM INFORMATION_SCHEMA.TABLES\n"
|
||||
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
|
||||
ImmutableList.of(),
|
||||
ImmutableList.<Object[]>builder()
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"})
|
||||
.add(new Object[]{"druid", "aview", "VIEW"})
|
||||
.add(new Object[]{"druid", "bview", "VIEW"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"lookup", "lookyloo", "TABLE"})
|
||||
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"})
|
||||
.add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"})
|
||||
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.build()
|
||||
);
|
||||
|
||||
testQuery(
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
"SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
|
||||
"SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n"
|
||||
+ "FROM INFORMATION_SCHEMA.TABLES\n"
|
||||
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
|
||||
CalciteTests.SUPER_USER_AUTH_RESULT,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.<Object[]>builder()
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"})
|
||||
.add(new Object[]{"druid", "aview", "VIEW"})
|
||||
.add(new Object[]{"druid", "bview", "VIEW"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"lookup", "lookyloo", "TABLE"})
|
||||
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"})
|
||||
.add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"})
|
||||
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"})
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.sql.calcite.schema;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
|
@ -38,6 +39,8 @@ import org.apache.druid.guice.LazySingleton;
|
|||
import org.apache.druid.guice.LifecycleModule;
|
||||
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
|
||||
import org.apache.druid.query.lookup.LookupReferencesManager;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.server.QueryLifecycleFactory;
|
||||
import org.apache.druid.server.SegmentManager;
|
||||
import org.apache.druid.server.security.AuthorizerMapper;
|
||||
|
@ -102,6 +105,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
|
|||
binder -> {
|
||||
binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory);
|
||||
binder.bind(TimelineServerView.class).toInstance(serverView);
|
||||
binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of()));
|
||||
binder.bind(PlannerConfig.class).toInstance(plannerConfig);
|
||||
binder.bind(ViewManager.class).toInstance(viewManager);
|
||||
binder.bind(Escalator.class).toInstance(escalator);
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.schema;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.segment.loading.SegmentLoader;
|
||||
import org.apache.druid.server.QueryStackTests;
|
||||
import org.apache.druid.server.SegmentManager;
|
||||
|
@ -54,6 +55,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
|
|||
),
|
||||
new TestServerInventoryView(Collections.emptyList()),
|
||||
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
|
||||
new MapJoinableFactory(ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopViewManager(),
|
||||
new NoopEscalator()
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.druid.data.input.InputRow;
|
|||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import org.apache.druid.query.TableDataSource;
|
||||
|
@ -43,6 +44,10 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
|
|||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.join.JoinConditionAnalysis;
|
||||
import org.apache.druid.segment.join.Joinable;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.segment.loading.SegmentLoader;
|
||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.QueryStackTests;
|
||||
|
@ -77,6 +82,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -132,12 +138,15 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
private SpecificSegmentsQuerySegmentWalker walker = null;
|
||||
private DruidSchema schema = null;
|
||||
private SegmentManager segmentManager;
|
||||
private Set<String> dataSourceNames;
|
||||
private Set<String> segmentDataSourceNames;
|
||||
private Set<String> joinableDataSourceNames;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
dataSourceNames = Sets.newConcurrentHashSet();
|
||||
segmentDataSourceNames = Sets.newConcurrentHashSet();
|
||||
joinableDataSourceNames = Sets.newConcurrentHashSet();
|
||||
|
||||
final File tmpDir = temporaryFolder.newFolder();
|
||||
final QueryableIndex index1 = IndexBuilder.create()
|
||||
.tmpDir(new File(tmpDir, "1"))
|
||||
|
@ -173,7 +182,7 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
public Set<String> getDataSourceNames()
|
||||
{
|
||||
getDatasourcesLatch.countDown();
|
||||
return dataSourceNames;
|
||||
return segmentDataSourceNames;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -222,10 +231,30 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
|
||||
druidServers = serverView.getDruidServers();
|
||||
|
||||
final JoinableFactory globalTableJoinable = new JoinableFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
return dataSource instanceof GlobalTableDataSource &&
|
||||
joinableDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(
|
||||
DataSource dataSource,
|
||||
JoinConditionAnalysis condition
|
||||
)
|
||||
{
|
||||
return Optional.empty();
|
||||
}
|
||||
};
|
||||
|
||||
schema = new DruidSchema(
|
||||
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
|
||||
serverView,
|
||||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopViewManager(),
|
||||
new NoopEscalator()
|
||||
|
@ -461,12 +490,16 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException
|
||||
public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException
|
||||
{
|
||||
DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
|
||||
Assert.assertNotNull(fooTable);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
|
||||
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
|
||||
Assert.assertFalse(fooTable.isJoinable());
|
||||
Assert.assertFalse(fooTable.isBroadcast());
|
||||
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
final DataSegment someNewBrokerSegment = new DataSegment(
|
||||
"foo",
|
||||
|
@ -481,12 +514,12 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
100L,
|
||||
PruneSpecsHolder.DEFAULT
|
||||
);
|
||||
dataSourceNames.add("foo");
|
||||
segmentDataSourceNames.add("foo");
|
||||
joinableDataSourceNames.add("foo");
|
||||
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
|
||||
|
||||
// wait for build
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
buildTableLatch = new CountDownLatch(1);
|
||||
// wait for build twice
|
||||
buildTableLatch = new CountDownLatch(2);
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
|
||||
|
@ -497,9 +530,12 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
Assert.assertNotNull(fooTable);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource);
|
||||
Assert.assertTrue(fooTable.isJoinable());
|
||||
Assert.assertTrue(fooTable.isBroadcast());
|
||||
|
||||
// now remove it
|
||||
dataSourceNames.remove("foo");
|
||||
joinableDataSourceNames.remove("foo");
|
||||
segmentDataSourceNames.remove("foo");
|
||||
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
|
||||
|
||||
// wait for build
|
||||
|
@ -515,6 +551,74 @@ public class DruidSchemaTest extends CalciteTestBase
|
|||
Assert.assertNotNull(fooTable);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
|
||||
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
|
||||
Assert.assertFalse(fooTable.isJoinable());
|
||||
Assert.assertFalse(fooTable.isBroadcast());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException
|
||||
{
|
||||
DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
|
||||
Assert.assertNotNull(fooTable);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
|
||||
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
|
||||
Assert.assertFalse(fooTable.isJoinable());
|
||||
Assert.assertFalse(fooTable.isBroadcast());
|
||||
|
||||
// wait for build twice
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
final DataSegment someNewBrokerSegment = new DataSegment(
|
||||
"foo",
|
||||
Intervals.of("2012/2013"),
|
||||
"version1",
|
||||
null,
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableList.of("met1", "met2"),
|
||||
new NumberedShardSpec(2, 3),
|
||||
null,
|
||||
1,
|
||||
100L,
|
||||
PruneSpecsHolder.DEFAULT
|
||||
);
|
||||
segmentDataSourceNames.add("foo");
|
||||
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
|
||||
|
||||
buildTableLatch = new CountDownLatch(2);
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
|
||||
getDatasourcesLatch = new CountDownLatch(1);
|
||||
getDatasourcesLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
fooTable = (DruidTable) schema.getTableMap().get("foo");
|
||||
Assert.assertNotNull(fooTable);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
|
||||
// should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be
|
||||
// changed in the future and we should expect
|
||||
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
|
||||
Assert.assertTrue(fooTable.isBroadcast());
|
||||
Assert.assertFalse(fooTable.isJoinable());
|
||||
|
||||
|
||||
// now remove it
|
||||
segmentDataSourceNames.remove("foo");
|
||||
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
|
||||
|
||||
// wait for build
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
buildTableLatch = new CountDownLatch(1);
|
||||
buildTableLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
|
||||
getDatasourcesLatch = new CountDownLatch(1);
|
||||
getDatasourcesLatch.await(1, TimeUnit.SECONDS);
|
||||
|
||||
fooTable = (DruidTable) schema.getTableMap().get("foo");
|
||||
Assert.assertNotNull(fooTable);
|
||||
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
|
||||
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
|
||||
Assert.assertFalse(fooTable.isBroadcast());
|
||||
Assert.assertFalse(fooTable.isJoinable());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.druid.segment.TestHelper;
|
|||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.segment.loading.SegmentLoader;
|
||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
|
@ -242,6 +243,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
|
||||
new TestServerInventoryView(walker.getSegments(), realtimeSegments),
|
||||
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
|
||||
new MapJoinableFactory(ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopViewManager(),
|
||||
new NoopEscalator()
|
||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.druid.segment.IndexBuilder;
|
|||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.segment.loading.SegmentLoader;
|
||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
|
@ -979,6 +980,7 @@ public class CalciteTests
|
|||
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
|
||||
new MapJoinableFactory(ImmutableMap.of()),
|
||||
plannerConfig,
|
||||
viewManager,
|
||||
TEST_AUTHENTICATOR_ESCALATOR
|
||||
|
|
|
@ -120,6 +120,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
|
|||
scheduler
|
||||
),
|
||||
conglomerate,
|
||||
joinableFactoryToUse,
|
||||
new ServerConfig()
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue