Refactor usage of JoinableFactoryWrapper + more test coverage (#12767)

Refactor usage of JoinableFactoryWrapper to add e2e test for createSegmentMapFn with joinToFilter feature enabled
This commit is contained in:
Rohan Garg 2022-07-12 18:55:36 +05:30 committed by GitHub
parent cebf2ba9c7
commit bb953be09b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 214 additions and 120 deletions

View File

@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.druid.client.CachingClusteredClient;
@ -104,7 +103,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -343,7 +342,7 @@ public class CachingClusteredClientBenchmark
processingConfig,
forkJoinPool,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}

View File

@ -69,6 +69,7 @@ import org.apache.druid.query.movingaverage.test.TestConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
@ -372,7 +373,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())),
new NoopServiceEmitter()
);

View File

@ -122,6 +122,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentMover;
@ -1359,7 +1360,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
new ForwardingQueryProcessingPool(exec),
NoopJoinableFactory.INSTANCE,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new WorkerConfig(),
MapCache.create(2048),
new CacheConfig(),

View File

@ -46,7 +46,7 @@ import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -96,7 +96,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager
Cache cache,
CacheConfig cacheConfig,
SegmentManager segmentManager,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
ServerConfig serverConfig
)
{
@ -109,7 +109,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager
cache,
cacheConfig,
segmentManager,
joinableFactory,
joinableFactoryWrapper,
serverConfig
);
}

View File

@ -47,6 +47,7 @@ public class HashJoinSegment implements SegmentReference
private static final Logger log = new Logger(HashJoinSegment.class);
private final SegmentReference baseSegment;
@Nullable
private final Filter baseFilter;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
@ -147,4 +148,15 @@ public class HashJoinSegment implements SegmentReference
return Optional.empty();
}
}
@Nullable
public Filter getBaseFilter()
{
return baseFilter;
}
public List<JoinableClause> getClauses()
{
return clauses;
}
}

View File

@ -22,11 +22,14 @@ package org.apache.druid.segment.join;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
@ -67,11 +70,17 @@ public class JoinableFactoryWrapper
private final JoinableFactory joinableFactory;
@Inject
public JoinableFactoryWrapper(final JoinableFactory joinableFactory)
{
this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory");
}
public JoinableFactory getJoinableFactory()
{
return joinableFactory;
}
/**
* Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join
* clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}.
@ -141,7 +150,12 @@ public class JoinableFactoryWrapper
);
return baseSegment ->
new HashJoinSegment(baseSegment, baseFilterToUse, clausesToUse, joinFilterPreAnalysis);
new HashJoinSegment(
baseSegment,
baseFilterToUse,
GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
joinFilterPreAnalysis
);
}
}
);

View File

@ -22,10 +22,14 @@ package org.apache.druid.query;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
public class TestQuery extends BaseQuery
{
@Nullable
private Set<String> requiredColumns;
public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context)
{
@ -72,4 +76,16 @@ public class TestQuery extends BaseQuery
BaseQuery.computeOverriddenContext(getContext(), contextOverride)
);
}
@Nullable
@Override
public Set<String> getRequiredColumns()
{
return requiredColumns;
}
public void setRequiredColumns(Set<String> requiredColumns)
{
this.requiredColumns = requiredColumns;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQuery;
import org.apache.druid.query.extraction.MapLookupExtractor;
@ -46,6 +47,8 @@ import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -54,12 +57,15 @@ import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.RowBasedIndexedTable;
import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -73,8 +79,9 @@ import java.util.stream.Collectors;
public class JoinableFactoryWrapperTest extends NullHandlingTest
{
private static final JoinableFactoryWrapper NOOP_JOINABLE_FACTORY_WRAPPER = new JoinableFactoryWrapper(
NoopJoinableFactory.INSTANCE);
public static final JoinableFactoryWrapper NOOP_JOINABLE_FACTORY_WRAPPER = new JoinableFactoryWrapper(
NoopJoinableFactory.INSTANCE
);
private static final Map<String, String> TEST_LOOKUP =
ImmutableMap.<String, String>builder()
@ -124,6 +131,9 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
DateTimes.nowUtc().toString()
);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -143,12 +153,11 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
@Test
public void test_createSegmentMapFn_unusableClause()
{
final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo");
final PreJoinableClause clause = new PreJoinableClause(
final PreJoinableClause clause = makePreJoinableClause(
INDEXED_TABLE_DS,
"country == \"j.country\"",
"j.",
lookupDataSource,
JoinType.LEFT,
JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil())
JoinType.LEFT
);
expectedException.expect(IllegalStateException.class);
@ -165,39 +174,14 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
@Test
public void test_createSegmentMapFn_usableClause()
{
final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo");
final JoinConditionAnalysis conditionAnalysis = JoinConditionAnalysis.forExpression(
"x == \"j.x\"",
final PreJoinableClause clause = makePreJoinableClause(
INDEXED_TABLE_DS,
"country == \"j.country\"",
"j.",
ExprMacroTable.nil()
);
final PreJoinableClause clause = new PreJoinableClause(
"j.",
lookupDataSource,
JoinType.LEFT,
conditionAnalysis
JoinType.LEFT
);
JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactory()
{
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
return dataSource.equals(lookupDataSource);
}
@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
return Optional.of(
LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
);
} else {
return Optional.empty();
}
}
});
JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new InlineJoinableFactory());
final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
null,
ImmutableList.of(clause),
@ -206,13 +190,64 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
new HashMap()
new HashMap<>()
)
);
Assert.assertNotSame(Function.identity(), segmentMapFn);
}
@Test
public void test_createSegmentMapFn_usableClause_joinToFilterEnabled() throws IOException
{
final PreJoinableClause clause = makePreJoinableClause(
INDEXED_TABLE_DS,
"country == \"j.country\"",
"j.",
JoinType.INNER
);
// required columns are necessary for the rewrite
final TestQuery queryWithRequiredColumnsAndJoinFilterRewrite = (TestQuery) new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
new HashMap<>()
).withOverriddenContext(ImmutableMap.of(QueryContexts.REWRITE_JOIN_TO_FILTER_ENABLE_KEY, "true"));
queryWithRequiredColumnsAndJoinFilterRewrite.setRequiredColumns(ImmutableSet.of("country"));
final JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new InlineJoinableFactory());
final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
null,
ImmutableList.of(clause),
new AtomicLong(),
queryWithRequiredColumnsAndJoinFilterRewrite
);
// dummy segment
final SegmentReference baseSegmentReference = ReferenceCountingSegment.wrapRootGenerationSegment(
new QueryableIndexSegment(
JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(),
SegmentId.dummy("facts")
)
);
// check the output contains the conversion filter
Assert.assertNotSame(Function.identity(), segmentMapFn);
final SegmentReference joinSegmentReference = segmentMapFn.apply(baseSegmentReference);
Assert.assertTrue(joinSegmentReference instanceof HashJoinSegment);
HashJoinSegment hashJoinSegment = (HashJoinSegment) joinSegmentReference;
Assert.assertEquals(
hashJoinSegment.getBaseFilter(),
new InDimFilter(
"country",
INDEXED_TABLE_DS.getRowsAsList().stream().map(row -> row[0].toString()).collect(Collectors.toSet())
)
);
// the returned clause list is not comparable with an expected clause list since the Joinable
// class member in JoinableClause doesn't implement equals method in its implementations
Assert.assertEquals(hashJoinSegment.getClauses().size(), 1);
}
@Test
public void test_computeJoinDataSourceCacheKey_noClauses()
{

View File

@ -74,7 +74,6 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.QueryScheduler;
@ -142,7 +141,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
DruidProcessingConfig processingConfig,
@Merging ForkJoinPool pool,
QueryScheduler scheduler,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
ServiceEmitter emitter
)
{
@ -156,7 +155,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.processingConfig = processingConfig;
this.pool = pool;
this.scheduler = scheduler;
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.emitter = emitter;
if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {

View File

@ -33,6 +33,7 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -80,7 +81,7 @@ public class Appenderators
emitter,
conglomerate,
queryProcessingPool,
joinableFactory,
new JoinableFactoryWrapper(joinableFactory),
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats

View File

@ -60,7 +60,6 @@ import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
@ -102,7 +101,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@ -114,7 +113,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");

View File

@ -60,6 +60,7 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.Sink;
@ -107,7 +108,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
private final Map<String, DatasourceBundle> datasourceBundles = new HashMap<>();
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final JoinableFactoryWrapper joinableFactoryWrapper;
private final WorkerConfig workerConfig;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -121,7 +122,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
@Inject
public UnifiedIndexerAppenderatorsManager(
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
WorkerConfig workerConfig,
Cache cache,
CacheConfig cacheConfig,
@ -132,7 +133,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
)
{
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.workerConfig = workerConfig;
this.cache = cache;
this.cacheConfig = cacheConfig;
@ -427,7 +428,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
serviceEmitter,
queryRunnerFactoryConglomerateProvider.get(),
queryProcessingPool,
joinableFactory,
joinableFactoryWrapper,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats

View File

@ -65,6 +65,7 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
@ -171,7 +172,7 @@ public class RealtimePlumber implements Plumber
emitter,
conglomerate,
queryProcessingPool,
joinableFactory,
new JoinableFactoryWrapper(joinableFactory),
cache,
cacheConfig,
cachePopulatorStats

View File

@ -37,7 +37,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.joda.time.Interval;
@ -66,14 +65,14 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
public LocalQuerySegmentWalker(
QueryRunnerFactoryConglomerate conglomerate,
SegmentWrangler segmentWrangler,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
QueryScheduler scheduler,
ServiceEmitter emitter
)
{
this.conglomerate = conglomerate;
this.segmentWrangler = segmentWrangler;
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.scheduler = scheduler;
this.emitter = emitter;
}

View File

@ -60,7 +60,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
@ -104,7 +103,7 @@ public class ServerManager implements QuerySegmentWalker
Cache cache,
CacheConfig cacheConfig,
SegmentManager segmentManager,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
ServerConfig serverConfig
)
{
@ -118,7 +117,7 @@ public class ServerManager implements QuerySegmentWalker
this.cacheConfig = cacheConfig;
this.segmentManager = segmentManager;
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.serverConfig = serverConfig;
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
@ -48,7 +47,7 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -336,7 +335,7 @@ public class CachingClusteredClientFunctionalityTest
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}

View File

@ -49,7 +49,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerManagerTest;
import org.apache.druid.server.coordination.ServerType;
@ -139,7 +139,7 @@ public class CachingClusteredClientPerfTest
Mockito.mock(DruidProcessingConfig.class),
ForkJoinPool.commonPool(),
queryScheduler,
NoopJoinableFactory.INSTANCE,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);

View File

@ -26,7 +26,6 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -120,7 +119,7 @@ import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.ServerType;
@ -2850,7 +2849,7 @@ public class CachingClusteredClientTest
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
@ -50,7 +49,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
@ -149,7 +148,7 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
servers = new ArrayList<>();

View File

@ -46,7 +46,7 @@ import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
@ -74,7 +74,7 @@ public class UnifiedIndexerAppenderatorsManagerTest extends InitializedNullHandl
private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager(
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new WorkerConfig(),
MapCache.create(10),
new CacheConfig(),

View File

@ -77,6 +77,7 @@ import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.Joinable;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
@ -1331,6 +1332,7 @@ public class ClientQuerySegmentWalkerTest
.put(globalFactory.getClass(), GlobalTableDataSource.class)
.build()
);
final JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
class CapturingWalker implements QuerySegmentWalker
{
@ -1379,7 +1381,7 @@ public class ClientQuerySegmentWalkerTest
.put(ARRAY, makeTimeline(ARRAY, ARRAY_INLINE))
.put(ARRAY_UNKNOWN, makeTimeline(ARRAY_UNKNOWN, ARRAY_INLINE_UNKNOWN))
.build(),
joinableFactory,
joinableFactoryWrapper,
conglomerate,
schedulerForTest
),
@ -1389,7 +1391,7 @@ public class ClientQuerySegmentWalkerTest
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
joinableFactory,
joinableFactoryWrapper,
schedulerForTest
),
ClusterOrLocal.LOCAL

View File

@ -70,6 +70,7 @@ import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
@ -159,25 +160,25 @@ public class QueryStackTests
public static TestClusterQuerySegmentWalker createClusterQuerySegmentWalker(
Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWraper,
QueryRunnerFactoryConglomerate conglomerate,
@Nullable QueryScheduler scheduler
)
{
return new TestClusterQuerySegmentWalker(timelines, joinableFactory, conglomerate, scheduler);
return new TestClusterQuerySegmentWalker(timelines, joinableFactoryWraper, conglomerate, scheduler);
}
public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final SegmentWrangler segmentWrangler,
final JoinableFactory joinableFactory,
final JoinableFactoryWrapper joinableFactoryWrapper,
final QueryScheduler scheduler
)
{
return new LocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
joinableFactory,
joinableFactoryWrapper,
scheduler,
EMITTER
);

View File

@ -46,7 +46,6 @@ import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -80,13 +79,13 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
TestClusterQuerySegmentWalker(
Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines,
JoinableFactory joinableFactory,
JoinableFactoryWrapper joinableFactoryWrapper,
QueryRunnerFactoryConglomerate conglomerate,
@Nullable QueryScheduler scheduler
)
{
this.timelines = timelines;
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.conglomerate = conglomerate;
this.scheduler = scheduler;
}

View File

@ -84,7 +84,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
@ -196,7 +196,7 @@ public class ServerManagerTest
new LocalCacheProvider().get(),
new CacheConfig(),
segmentManager,
NoopJoinableFactory.INSTANCE,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new ServerConfig()
);

View File

@ -34,7 +34,7 @@ public class TestQueryMakerFactory implements QueryMakerFactory
private final QueryLifecycleFactory queryLifecycleFactory;
private final ObjectMapper jsonMapper;
TestQueryMakerFactory(
public TestQueryMakerFactory(
final QueryLifecycleFactory queryLifecycleFactory,
final ObjectMapper jsonMapper
)

View File

@ -95,6 +95,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.Joinable;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.RowBasedIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
@ -883,7 +884,7 @@ public class CalciteTests
final QueryScheduler scheduler
)
{
return createMockWalker(conglomerate, tmpDir, scheduler, null);
return createMockWalker(conglomerate, tmpDir, scheduler, (JoinableFactory) null);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
@ -892,6 +893,29 @@ public class CalciteTests
final QueryScheduler scheduler,
final JoinableFactory joinableFactory
)
{
final JoinableFactory joinableFactoryToUse;
if (joinableFactory == null) {
joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class)
);
} else {
joinableFactoryToUse = joinableFactory;
}
return createMockWalker(
conglomerate,
tmpDir,
scheduler,
new JoinableFactoryWrapper(joinableFactoryToUse)
);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir,
final QueryScheduler scheduler,
final JoinableFactoryWrapper joinableFactoryWrapper
)
{
final QueryableIndex index1 = IndexBuilder
.create()
@ -969,7 +993,7 @@ public class CalciteTests
return new SpecificSegmentsQuerySegmentWalker(
conglomerate,
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
joinableFactory,
joinableFactoryWrapper,
scheduler
).add(
DataSegment.builder()

View File

@ -41,6 +41,7 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
@ -49,7 +50,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@ -72,6 +72,21 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
private final List<Closeable> closeables = new ArrayList<>();
private final List<DataSegment> segments = new ArrayList<>();
private static final LookupExtractorFactoryContainerProvider LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER =
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return Collections.emptySet();
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
};
/**
* Create an instance using the provided query runner factory conglomerate and lookup provider.
@ -81,22 +96,14 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory,
final JoinableFactoryWrapper joinableFactoryWrapper,
final QueryScheduler scheduler
)
{
final JoinableFactory joinableFactoryToUse;
if (joinableFactory == null) {
joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(lookupProvider);
} else {
joinableFactoryToUse = joinableFactory;
}
this.walker = QueryStackTests.createClientQuerySegmentWalker(
QueryStackTests.createClusterQuerySegmentWalker(
timelines,
joinableFactoryToUse,
joinableFactoryWrapper,
conglomerate,
scheduler
),
@ -108,11 +115,11 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
.put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
.build()
),
joinableFactoryToUse,
joinableFactoryWrapper,
scheduler
),
conglomerate,
joinableFactoryToUse,
joinableFactoryWrapper.getJoinableFactory(),
new ServerConfig()
);
}
@ -125,21 +132,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
{
this(
conglomerate,
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return Collections.emptySet();
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
},
null,
LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER,
new JoinableFactoryWrapper(QueryStackTests.makeJoinableFactoryForLookup(LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER)),
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}