Add JoinableFactory interface and use it in the query stack. (#9247)

* Add JoinableFactory interface and use it in the query stack.

Also includes InlineJoinableFactory, which enables joining against
inline datasources. This is the first patch where a basic join query
actually works. It includes integration tests.

* Fix test issues.

* Adjustments from code review.
This commit is contained in:
Gian Merlino 2020-01-24 13:10:01 -08:00 committed by GitHub
parent 3daf0f8e12
commit 19b427e8f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 1121 additions and 97 deletions

View File

@ -29,6 +29,8 @@ import java.net.URL;
import java.nio.file.Paths;
import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -80,13 +82,29 @@ public class JvmUtils
* @return total CPU time for the current thread in nanoseconds.
*
* @throws UnsupportedOperationException if the Java virtual machine does not support CPU time measurement for
* the current thread.
* the current thread.
*/
public static long getCurrentThreadCpuTime()
{
return THREAD_MX_BEAN.getCurrentThreadCpuTime();
}
/**
* Executes and returns the value of {@code function}. Also accumulates the CPU time taken for the function (as
* reported by {@link #getCurrentThreadCpuTime()} into {@param accumulator}.
*/
public static <T> T safeAccumulateThreadCpuTime(final AtomicLong accumulator, final Supplier<T> function)
{
final long start = safeGetThreadCpuTime();
try {
return function.get();
}
finally {
accumulator.addAndGet(safeGetThreadCpuTime() - start);
}
}
public static List<URL> systemClassPath()
{
List<URL> jobURLs;

View File

@ -105,6 +105,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
@ -2652,6 +2653,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
handoffNotifierFactory,
this::makeTimeseriesAndScanConglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),

View File

@ -97,6 +97,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
@ -2888,6 +2889,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),

View File

@ -42,6 +42,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
@ -86,6 +87,7 @@ public class TaskToolbox
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final SegmentLoader segmentLoader;
private final ObjectMapper jsonMapper;
private final File taskWorkDir;
@ -116,6 +118,7 @@ public class TaskToolbox
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper jsonMapper,
@ -146,6 +149,7 @@ public class TaskToolbox
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.jsonMapper = jsonMapper;
@ -229,6 +233,11 @@ public class TaskToolbox
return queryExecutorService;
}
public JoinableFactory getJoinableFactory()
{
return joinableFactory;
}
public MonitorScheduler getMonitorScheduler()
{
return monitorScheduler;

View File

@ -42,6 +42,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
@ -72,6 +73,7 @@ public class TaskToolboxFactory
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper jsonMapper;
@ -102,6 +104,7 @@ public class TaskToolboxFactory
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
@Processing ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
MonitorScheduler monitorScheduler,
SegmentLoaderFactory segmentLoaderFactory,
@Json ObjectMapper jsonMapper,
@ -131,6 +134,7 @@ public class TaskToolboxFactory
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory;
this.jsonMapper = jsonMapper;
@ -164,6 +168,7 @@ public class TaskToolboxFactory
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
queryExecutorService,
joinableFactory,
monitorScheduler,
segmentLoaderFactory.manufacturate(taskWorkDir),
jsonMapper,

View File

@ -772,6 +772,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats()

View File

@ -341,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getJoinableFactory(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),
toolbox.getCache(),

View File

@ -206,6 +206,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats()

View File

@ -34,6 +34,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
@ -109,6 +110,7 @@ public class TaskToolboxTest
mockHandoffNotifierFactory,
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
NoopJoinableFactory.INSTANCE,
mockMonitorScheduler,
mockSegmentLoaderFactory,
ObjectMapper,

View File

@ -113,6 +113,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
@ -1591,6 +1592,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),

View File

@ -61,6 +61,7 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
@ -877,6 +878,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
loader,
objectMapper,

View File

@ -108,6 +108,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -1274,6 +1275,7 @@ public class CompactionTaskTest
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
null,

View File

@ -56,6 +56,7 @@ import org.apache.druid.metadata.SQLMetadataSegmentManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
@ -306,6 +307,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
objectMapper,

View File

@ -100,6 +100,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.FireDepartment;
@ -973,6 +974,7 @@ public class RealtimeIndexTaskTest
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),

View File

@ -31,6 +31,7 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
@ -60,6 +61,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@ -78,6 +80,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
segmentAnnouncer,
emitter,
queryExecutorService,
joinableFactory,
cache,
cacheConfig,
cachePopulatorStats

View File

@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
@ -301,6 +302,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
newSegmentLoader(temporaryFolder.newFolder()),
getObjectMapper(),

View File

@ -33,6 +33,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
@ -94,6 +95,7 @@ public class SingleTaskBackgroundRunnerTest
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentLoaderFactory(null, utils.getTestObjectMapper()),
utils.getTestObjectMapper(),

View File

@ -117,6 +117,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
@ -663,6 +664,7 @@ public class TaskLifecycleTest
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
Execs.directExecutor(), // query executor service
NoopJoinableFactory.INSTANCE,
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(null, new DefaultObjectMapper()),
MAPPER,
@ -1329,6 +1331,7 @@ public class TaskLifecycleTest
UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
exec,
NoopJoinableFactory.INSTANCE,
new WorkerConfig(),
MapCache.create(2048),
new CacheConfig(),

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.TestTaskRunner;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@ -120,6 +121,7 @@ public class WorkerTaskManagerTest
notifierFactory,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentLoaderFactory(null, jsonMapper),
jsonMapper,

View File

@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.IndexerZkConfig;
@ -170,7 +171,18 @@ public class WorkerTaskMonitorTest
taskConfig,
null,
taskActionClientFactory,
null, null, null, null, null, null, null, notifierFactory, null, null, null,
null,
null,
null,
null,
null,
null,
null,
notifierFactory,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentLoaderFactory(null, jsonMapper),
jsonMapper,
indexIO,

View File

@ -11,6 +11,7 @@ command=java
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.processing.buffer.sizeBytes=25000000
-Ddruid.query.groupBy.maxOnDiskStorage=300000000
-Ddruid.server.http.numThreads=40
-Ddruid.processing.numThreads=1
-Ddruid.broker.http.numConnections=20

View File

@ -14,6 +14,7 @@ command=java
-Ddruid.s3.secretKey=OBaLISDFjKLajSTrJ53JoTtzTZLjPlRePcwa+Pjv
-Ddruid.processing.buffer.sizeBytes=25000000
-Ddruid.processing.numThreads=2
-Ddruid.query.groupBy.maxOnDiskStorage=300000000
-Ddruid.server.http.numThreads=20
-Ddruid.segmentCache.locations="[{\"path\":\"/shared/druid/indexCache\",\"maxSize\":5000000000}]"
-Ddruid.server.maxSize=5000000000

View File

@ -36,5 +36,57 @@
}
}
]
},
{
"description": "topN, 1 agg, join to inline",
"query": {
"queryType": "topN",
"dataSource": {
"type": "join",
"left": "%%DATASOURCE%%",
"right": {
"type": "inline",
"columnNames": ["language", "lookupLanguage"],
"columnTypes": ["string", "string"],
"rows": [
["en", "inline join!"]
]
},
"rightPrefix": "j.",
"condition": "language == \"j.language\"",
"joinType": "LEFT"
},
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupLanguage",
"expression": "nvl(\"j.lookupLanguage\", \"language\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "longSum",
"name": "count",
"fieldName": "count"
}
],
"dimension": "lookupLanguage",
"metric": "count",
"threshold": 3
},
"expectedResults": [
{
"timestamp": "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
"result": [
{
"lookupLanguage": "inline join!",
"count": %%TIMESERIES_NUMEVENTS%%
}
]
}
]
}
]

View File

@ -83,5 +83,65 @@
"rows" : 1
}
} ]
},
{
"description": "topN, 1 agg, join to inline",
"query": {
"queryType": "topN",
"dataSource": {
"type": "join",
"left": "%%DATASOURCE%%",
"right": {
"type": "inline",
"columnNames": ["language", "lookupLanguage"],
"columnTypes": ["string", "string"],
"rows": [
["en", "inline join!"]
]
},
"rightPrefix": "j.",
"condition": "language == \"j.language\"",
"joinType": "LEFT"
},
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupLanguage",
"expression": "nvl(\"j.lookupLanguage\", \"language\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "longSum",
"name": "count",
"fieldName": "count"
}
],
"dimension": "lookupLanguage",
"metric": "count",
"threshold": 3
},
"expectedResults": [
{
"timestamp": "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
"result": [
{
"lookupLanguage": "inline join!",
"count": 14
},
{
"lookupLanguage": "ja",
"count": 3
},
{
"lookupLanguage": "ru",
"count": 3
}
]
}
]
}
]

View File

@ -1299,5 +1299,165 @@
]
}
]
},
{
"description": "topN, 1 agg, join to inline",
"query": {
"queryType": "topN",
"dataSource": {
"type": "join",
"left": "wikipedia_editstream",
"right": {
"type": "inline",
"columnNames": ["page", "lookupPage"],
"columnTypes": ["string", "string"],
"rows": [
["Wikipedia:Vandalismusmeldung", "inline join!"]
]
},
"rightPrefix": "j.",
"condition": "page == \"j.page\"",
"joinType": "LEFT"
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupPage",
"expression": "nvl(\"j.lookupPage\", \"page\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"dimension": "lookupPage",
"metric": "rows",
"threshold": 3,
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"timestamp": "2013-01-01T00:00:00.000Z",
"result": [
{
"lookupPage": "inline join!",
"rows": 991
},
{
"lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents",
"rows": 990
},
{
"lookupPage": "Wikipedia:Administrator_intervention_against_vandalism",
"rows": 800
}
]
}
]
},
{
"description": "groupBy, 1 agg, subquery over join to inline",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "join",
"left": "wikipedia_editstream",
"right": {
"type": "inline",
"columnNames": ["page", "lookupPage"],
"columnTypes": ["string", "string"],
"rows": [
["Wikipedia:Vandalismusmeldung", "inline join!"]
]
},
"rightPrefix": "j.",
"condition": "page == \"j.page\"",
"joinType": "LEFT"
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupPage",
"expression": "nvl(\"j.lookupPage\", \"page\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"dimensions": ["lookupPage"]
}
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-02T00:00:00.000"],
"granularity": "all",
"aggregations": [
{
"type": "longSum",
"name": "rows_outer",
"fieldName": "rows"
}
],
"dimensions": ["lookupPage"],
"limitSpec": {
"type": "default",
"columns": [
{
"dimension": "rows_outer",
"dimensionOrder": "numeric",
"direction": "descending"
}
],
"limit": 3
},
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"lookupPage": "inline join!",
"rows_outer": 991
}
},
{
"version": "v1",
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents",
"rows_outer": 990
}
},
{
"version": "v1",
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"lookupPage": "Wikipedia:Administrator_intervention_against_vandalism",
"rows_outer": 800
}
}
]
}
]

View File

@ -22,39 +22,41 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.ReferenceCounter;
import org.apache.druid.segment.Segment;
/**
*/
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunnerFactory<T, Query<T>> factory;
private final ReferenceCountingSegment adapter;
private final Segment segment;
private final ReferenceCounter segmentReferenceCounter;
private final SegmentDescriptor descriptor;
public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
ReferenceCountingSegment adapter,
Segment segment,
ReferenceCounter segmentReferenceCounter,
SegmentDescriptor descriptor
)
{
this.factory = factory;
this.adapter = adapter;
this.segment = segment;
this.segmentReferenceCounter = segmentReferenceCounter;
this.descriptor = descriptor;
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{
if (adapter.increment()) {
if (segmentReferenceCounter.increment()) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext);
final Sequence<T> baseSequence = factory.createRunner(segment).run(queryPlus, responseContext);
return Sequences.withBaggage(baseSequence, adapter.decrementOnceCloseable());
return Sequences.withBaggage(baseSequence, segmentReferenceCounter.decrementOnceCloseable());
}
catch (Throwable t) {
try {
adapter.decrement();
segmentReferenceCounter.decrement();
}
catch (Exception e) {
t.addSuppressed(e);

View File

@ -39,7 +39,7 @@ public class PreJoinableClause
private final JoinType joinType;
private final JoinConditionAnalysis condition;
PreJoinableClause(
public PreJoinableClause(
final String prefix,
final DataSource dataSource,
final JoinType joinType,

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import java.io.Closeable;
/**
* An interface to reference-counted objects. Used by {@link ReferenceCountingSegment}. Thread-safe.
*/
public interface ReferenceCounter
{
/**
* Increment the reference count by one.
*/
boolean increment();
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
Closeable decrementOnceCloseable();
/**
* Decrement the reference count by one.
*/
void decrement();
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
@ -38,7 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
* until that. So ReferenceCountingSegment implements something like automatic reference count-based resource
* management.
*/
public class ReferenceCountingSegment extends AbstractSegment implements Overshadowable<ReferenceCountingSegment>
public class ReferenceCountingSegment extends AbstractSegment
implements Overshadowable<ReferenceCountingSegment>, ReferenceCounter
{
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
@ -167,6 +167,12 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
}
}
public ReferenceCounter referenceCounter()
{
return this;
}
@Override
public boolean increment()
{
// Negative return from referents.register() means the Phaser is terminated.
@ -177,6 +183,7 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
@Override
public Closeable decrementOnceCloseable()
{
AtomicBoolean decremented = new AtomicBoolean(false);
@ -189,6 +196,7 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
};
}
@Override
public void decrement()
{
referents.arriveAndDeregister();

View File

@ -24,7 +24,6 @@ import org.apache.druid.segment.AbstractSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@ -50,15 +49,9 @@ public class HashJoinSegment extends AbstractSegment
this.baseSegment = baseSegment;
this.clauses = clauses;
// Verify no clauses would shadow the special __time field.
for (JoinableClause clause : clauses) {
if (clause.includesColumn(ColumnHolder.TIME_COLUMN_NAME)) {
throw new IAE(
"Clause cannot have prefix[%s], since it would shadow %s",
clause.getPrefix(),
ColumnHolder.TIME_COLUMN_NAME
);
}
// Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know)
if (clauses.isEmpty()) {
throw new IAE("'clauses' is empty, no need to create HashJoinSegment");
}
}

View File

@ -49,6 +49,7 @@ import java.util.Optional;
public class JoinConditionAnalysis
{
private final String originalExpression;
private final String rightPrefix;
private final List<Equality> equiConditions;
private final List<Expr> nonEquiConditions;
private final boolean isAlwaysFalse;
@ -57,19 +58,23 @@ public class JoinConditionAnalysis
private JoinConditionAnalysis(
final String originalExpression,
final String rightPrefix,
final List<Equality> equiConditions,
final List<Expr> nonEquiConditions
)
{
this.originalExpression = Preconditions.checkNotNull(originalExpression, "originalExpression");
this.rightPrefix = Preconditions.checkNotNull(rightPrefix, "rightPrefix");
this.equiConditions = Collections.unmodifiableList(equiConditions);
this.nonEquiConditions = Collections.unmodifiableList(nonEquiConditions);
// if any nonEquiCondition is an expression and it evaluates to false
isAlwaysFalse = nonEquiConditions.stream()
.anyMatch(expr -> expr.isLiteral() && !expr.eval(ExprUtils.nilBindings()).asBoolean());
.anyMatch(expr -> expr.isLiteral() && !expr.eval(ExprUtils.nilBindings())
.asBoolean());
// if there are no equiConditions and all nonEquiConditions are literals and the evaluate to true
isAlwaysTrue = equiConditions.isEmpty() && nonEquiConditions.stream()
.allMatch(expr -> expr.isLiteral() && expr.eval(ExprUtils.nilBindings()).asBoolean());
.allMatch(expr -> expr.isLiteral() && expr.eval(
ExprUtils.nilBindings()).asBoolean());
canHashJoin = nonEquiConditions.stream().allMatch(Expr::isLiteral);
}
@ -113,14 +118,14 @@ public class JoinConditionAnalysis
}
}
return new JoinConditionAnalysis(condition, equiConditions, nonEquiConditions);
return new JoinConditionAnalysis(condition, rightPrefix, equiConditions, nonEquiConditions);
}
private static boolean isLeftExprAndRightColumn(final Expr a, final Expr b, final String rightPrefix)
{
return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> c.startsWith(rightPrefix))
return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> Joinables.isPrefixedBy(c, rightPrefix))
&& b.getIdentifierIfIdentifier() != null
&& b.getIdentifierIfIdentifier().startsWith(rightPrefix);
&& Joinables.isPrefixedBy(b.getIdentifierIfIdentifier(), rightPrefix);
}
/**
@ -181,13 +186,14 @@ public class JoinConditionAnalysis
return false;
}
JoinConditionAnalysis that = (JoinConditionAnalysis) o;
return Objects.equals(originalExpression, that.originalExpression);
return Objects.equals(originalExpression, that.originalExpression) &&
Objects.equals(rightPrefix, that.rightPrefix);
}
@Override
public int hashCode()
{
return Objects.hash(originalExpression);
return Objects.hash(originalExpression, rightPrefix);
}
@Override

View File

@ -43,6 +43,10 @@ public interface Joinable
/**
* Returns the cardinality of "columnName", or {@link #CARDINALITY_UNKNOWN} if not known. May be used at query
* time to trigger optimizations.
*
* If not {@link #CARDINALITY_UNKNOWN}, this must match the cardinality of selectors returned by the
* {@link ColumnSelectorFactory#makeDimensionSelector} method of this joinable's
* {@link JoinMatcher#getColumnSelectorFactory()} .
*/
int getCardinality(String columnName);

View File

@ -30,6 +30,8 @@ import java.util.stream.Collectors;
* Represents everything about a join clause except for the left-hand datasource. In other words, if the full join
* clause is "t1 JOIN t2 ON t1.x = t2.x" then this class represents "JOIN t2 ON x = t2.x" -- it does not include
* references to the left-hand "t1".
*
* Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link Joinables#createSegmentMapFn}.
*/
public class JoinableClause
{
@ -101,7 +103,7 @@ public class JoinableClause
*/
public boolean includesColumn(final String columnName)
{
return columnName.startsWith(prefix) && columnName.length() > prefix.length();
return Joinables.isPrefixedBy(columnName, prefix);
}
/**

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import org.apache.druid.query.DataSource;
import java.util.Optional;
/**
* Utility for creating {@link Joinable} objects.
*/
public interface JoinableFactory
{
/**
* Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc.
*
* @param dataSource the datasource to join on
* @param condition the condition to join on
*
* @return a Joinable if this datasource + condition combo is joinable; empty if not
*/
Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);
}

View File

@ -20,9 +20,18 @@
package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Utility methods for working with {@link Joinable} related classes.
@ -52,4 +61,56 @@ public class Joinables
{
return columnName.startsWith(prefix) && columnName.length() > prefix.length();
}
/**
* Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join
* clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}.
*
* @param clauses pre-joinable clauses
* @param joinableFactory factory for joinables
* @param cpuTimeAccumulator an accumulator that we will add CPU nanos to; this is part of the function to encourage
* callers to remember to track metrics on CPU time required for creation of Joinables
*/
public static Function<Segment, Segment> createSegmentMapFn(
final List<PreJoinableClause> clauses,
final JoinableFactory joinableFactory,
final AtomicLong cpuTimeAccumulator
)
{
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (clauses.isEmpty()) {
return Function.identity();
} else {
final List<JoinableClause> joinableClauses = createJoinableClauses(clauses, joinableFactory);
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses);
}
}
);
}
/**
* Returns a list of {@link JoinableClause} corresponding to a list of {@link PreJoinableClause}. This will call
* {@link JoinableFactory#build} on each one and therefore may be an expensive operation.
*/
private static List<JoinableClause> createJoinableClauses(
final List<PreJoinableClause> clauses,
final JoinableFactory joinableFactory
)
{
return clauses.stream().map(preJoinableClause -> {
final Optional<Joinable> joinable = joinableFactory.build(
preJoinableClause.getDataSource(),
preJoinableClause.getCondition()
);
return new JoinableClause(
preJoinableClause.getPrefix(),
joinable.orElseThrow(() -> new ISE("dataSource is not joinable: %s", preJoinableClause.getDataSource())),
preJoinableClause.getJoinType(),
preJoinableClause.getCondition()
);
}).collect(Collectors.toList());
}
}

View File

@ -56,7 +56,7 @@ public class IndexedTableColumnSelectorFactory implements ColumnSelectorFactory
capabilities.setDictionaryEncoded(true);
}
return capabilities;
return capabilities.setIsComplete(true);
} else {
return null;
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.column.ValueType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -63,6 +64,10 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
this.columnFunctions = columns.stream().map(rowAdapter::columnFunction).collect(Collectors.toList());
this.keyColumns = keyColumns;
if (new HashSet<>(keyColumns).size() != keyColumns.size()) {
throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns);
}
if (!rowSignature.keySet().containsAll(keyColumns)) {
throw new ISE(
"keyColumns[%s] must all be contained in rowSignature[%s]",

View File

@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
@ -40,9 +41,11 @@ public class HashJoinSegmentTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private QueryableIndexSegment baseSegment;
private HashJoinSegment hashJoinSegmentNoClauses;
private HashJoinSegment hashJoinSegmentManyClauses;
private HashJoinSegment hashJoinSegment;
@BeforeClass
public static void setUpStatic()
@ -58,12 +61,7 @@ public class HashJoinSegmentTest
SegmentId.dummy("facts")
);
hashJoinSegmentNoClauses = new HashJoinSegment(
baseSegment,
ImmutableList.of()
);
hashJoinSegmentManyClauses = new HashJoinSegment(
hashJoinSegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
@ -83,55 +81,37 @@ public class HashJoinSegmentTest
}
@Test
public void test_getId_noClauses()
public void test_constructor_noClauses()
{
Assert.assertEquals(baseSegment.getId(), hashJoinSegmentNoClauses.getId());
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("'clauses' is empty, no need to create HashJoinSegment");
final HashJoinSegment ignored = new HashJoinSegment(baseSegment, ImmutableList.of());
}
@Test
public void test_getId_manyClauses()
public void test_getId()
{
Assert.assertEquals(baseSegment.getId(), hashJoinSegmentManyClauses.getId());
Assert.assertEquals(baseSegment.getId(), hashJoinSegment.getId());
}
@Test
public void test_getDataInterval_noClauses()
public void test_getDataInterval()
{
Assert.assertEquals(baseSegment.getDataInterval(), hashJoinSegmentNoClauses.getDataInterval());
Assert.assertEquals(baseSegment.getDataInterval(), hashJoinSegment.getDataInterval());
}
@Test
public void test_getDataInterval_manyClauses()
public void test_asQueryableIndex()
{
Assert.assertEquals(baseSegment.getDataInterval(), hashJoinSegmentManyClauses.getDataInterval());
Assert.assertNull(hashJoinSegment.asQueryableIndex());
}
@Test
public void test_asQueryableIndex_noClauses()
{
Assert.assertNull(hashJoinSegmentNoClauses.asQueryableIndex());
}
@Test
public void test_asQueryableIndex_manyClauses()
{
Assert.assertNull(hashJoinSegmentManyClauses.asQueryableIndex());
}
@Test
public void test_asStorageAdapter_noClauses()
public void test_asStorageAdapter()
{
Assert.assertThat(
hashJoinSegmentNoClauses.asStorageAdapter(),
CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class)
);
}
@Test
public void test_asStorageAdapter_manyClauses()
{
Assert.assertThat(
hashJoinSegmentManyClauses.asStorageAdapter(),
hashJoinSegment.asStorageAdapter(),
CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class)
);
}

View File

@ -19,12 +19,24 @@
package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class JoinablesTest
{
@Rule
@ -74,4 +86,70 @@ public class JoinablesTest
Assert.assertTrue(Joinables.isPrefixedBy("foo", "fo"));
Assert.assertFalse(Joinables.isPrefixedBy("foo", "foo"));
}
@Test
public void test_createSegmentMapFn_noClauses()
{
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(),
NoopJoinableFactory.INSTANCE,
new AtomicLong()
);
Assert.assertSame(Function.identity(), segmentMapFn);
}
@Test
public void test_createSegmentMapFn_unusableClause()
{
final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo");
final PreJoinableClause clause = new PreJoinableClause(
"j.",
lookupDataSource,
JoinType.LEFT,
JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil())
);
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("dataSource is not joinable");
final Function<Segment, Segment> ignored = Joinables.createSegmentMapFn(
ImmutableList.of(clause),
NoopJoinableFactory.INSTANCE,
new AtomicLong()
);
}
@Test
public void test_createSegmentMapFn_usableClause()
{
final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo");
final JoinConditionAnalysis conditionAnalysis = JoinConditionAnalysis.forExpression(
"x == \"j.x\"",
"j.",
ExprMacroTable.nil()
);
final PreJoinableClause clause = new PreJoinableClause(
"j.",
lookupDataSource,
JoinType.LEFT,
conditionAnalysis
);
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(clause),
(dataSource, condition) -> {
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
return Optional.of(
LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
);
} else {
return Optional.empty();
}
},
new AtomicLong()
);
Assert.assertNotSame(Function.identity(), segmentMapFn);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import org.apache.druid.query.DataSource;
import java.util.Optional;
public class NoopJoinableFactory implements JoinableFactory
{
public static final NoopJoinableFactory INSTANCE = new NoopJoinableFactory();
private NoopJoinableFactory()
{
// Singleton.
}
@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
return Optional.empty();
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.table;
import com.google.common.collect.ImmutableList;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinMatcher;
import org.junit.Assert;
import org.junit.Test;
public class IndexedTableJoinableTest
{
private static final String PREFIX = "j.";
private final ColumnSelectorFactory dummyColumnSelectorFactory = new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
{
return null;
}
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
return null;
}
};
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of("str", "long"),
ImmutableList.of(ValueType.STRING, ValueType.LONG),
ImmutableList.of(
new Object[]{"foo", 1L},
new Object[]{"bar", 2L}
)
);
private final RowBasedIndexedTable<Object[]> indexedTable = new RowBasedIndexedTable<>(
inlineDataSource.getRowsAsList(),
inlineDataSource.rowAdapter(),
inlineDataSource.getRowSignature(),
ImmutableList.of("str")
);
@Test
public void test_getAvailableColumns()
{
final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable);
Assert.assertEquals(ImmutableList.of("long", "str"), joinable.getAvailableColumns());
}
@Test
public void test_getColumnCapabilities_string()
{
final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable);
final ColumnCapabilities capabilities = joinable.getColumnCapabilities("str");
Assert.assertEquals(ValueType.STRING, capabilities.getType());
Assert.assertTrue(capabilities.isDictionaryEncoded());
Assert.assertFalse(capabilities.hasBitmapIndexes());
Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.hasSpatialIndexes());
Assert.assertTrue(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_long()
{
final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable);
final ColumnCapabilities capabilities = joinable.getColumnCapabilities("long");
Assert.assertEquals(ValueType.LONG, capabilities.getType());
Assert.assertFalse(capabilities.isDictionaryEncoded());
Assert.assertFalse(capabilities.hasBitmapIndexes());
Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.hasSpatialIndexes());
Assert.assertTrue(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_nonexistent()
{
final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable);
final ColumnCapabilities capabilities = joinable.getColumnCapabilities("nonexistent");
Assert.assertNull(capabilities);
}
@Test
public void test_makeJoinMatcher_dimensionSelectorOnString()
{
final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable);
final JoinConditionAnalysis condition = JoinConditionAnalysis.forExpression(
"x == \"j.str\"",
PREFIX,
ExprMacroTable.nil()
);
final JoinMatcher joinMatcher = joinable.makeJoinMatcher(dummyColumnSelectorFactory, condition, false);
final DimensionSelector selector = joinMatcher.getColumnSelectorFactory()
.makeDimensionSelector(DefaultDimensionSpec.of("str"));
// getValueCardinality
Assert.assertEquals(3, selector.getValueCardinality());
// nameLookupPossibleInAdvance
Assert.assertTrue(selector.nameLookupPossibleInAdvance());
// lookupName
Assert.assertEquals("foo", selector.lookupName(0));
Assert.assertEquals("bar", selector.lookupName(1));
Assert.assertNull(selector.lookupName(2));
// lookupId
Assert.assertNull(selector.idLookup());
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.inject.Inject;
import org.apache.druid.query.DataSource;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class DefaultJoinableFactory implements JoinableFactory
{
private final List<JoinableFactory> factories;
@Inject
public DefaultJoinableFactory(final InlineJoinableFactory inlineJoinableFactory)
{
// Just one right now, but we expect there to be more in the future, and maybe even an extension mechanism.
this.factories = Collections.singletonList(inlineJoinableFactory);
}
@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
for (JoinableFactory factory : factories) {
final Optional<Joinable> maybeJoinable = factory.build(dataSource, condition);
if (maybeJoinable.isPresent()) {
return maybeJoinable;
}
}
return Optional.empty();
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.RowBasedIndexedTable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* A {@link JoinableFactory} for {@link InlineDataSource}. It works by building an {@link IndexedTable}.
*/
public class InlineJoinableFactory implements JoinableFactory
{
@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
if (condition.canHashJoin() && dataSource instanceof InlineDataSource) {
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
final List<String> rightKeyColumns =
condition.getEquiConditions().stream().map(Equality::getRightColumn).distinct().collect(Collectors.toList());
return Optional.of(
new IndexedTableJoinable(
new RowBasedIndexedTable<>(
inlineDataSource.getRowsAsList(),
inlineDataSource.rowAdapter(),
inlineDataSource.getRowSignature(),
rightKeyColumns
)
)
);
} else {
return Optional.empty();
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -52,6 +53,7 @@ public class Appenderators
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@ -75,6 +77,7 @@ public class Appenderators
emitter,
conglomerate,
queryExecutorService,
joinableFactory,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats

View File

@ -31,6 +31,7 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -74,6 +75,7 @@ public interface AppenderatorsManager
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats

View File

@ -32,6 +32,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -46,6 +47,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper jsonMapper;
private final IndexIO indexIO;
@ -59,6 +61,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject DataSegmentPusher dataSegmentPusher,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject IndexIO indexIO,
@ -72,6 +75,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.dataSegmentPusher = dataSegmentPusher;
this.jsonMapper = jsonMapper;
this.indexIO = indexIO;
@ -107,6 +111,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
segmentAnnouncer,
emitter,
queryExecutorService,
joinableFactory,
cache,
cacheConfig,
cachePopulatorStats

View File

@ -32,6 +32,7 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -65,6 +66,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats

View File

@ -32,6 +32,7 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -71,6 +72,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@ -94,6 +96,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
segmentAnnouncer,
emitter,
queryExecutorService,
joinableFactory,
cache,
cacheConfig,
cachePopulatorStats

View File

@ -57,6 +57,8 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId;
@ -69,6 +71,7 @@ import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* Query handler for indexing tasks.
@ -85,6 +88,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
@ -96,6 +100,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@ -107,6 +112,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
@ -149,11 +155,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
throw new ISE("Cannot handle datasource: %s", analysis.getDataSource());
}
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
@ -168,6 +169,13 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuTimeAccumulator
);
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs,
descriptor -> {
@ -202,7 +210,9 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segmentAndCloseable = hydrant.getAndIncrementSegment();
try {
QueryRunner<T> runner = factory.createRunner(segmentAndCloseable.lhs);
final Segment mappedSegment = segmentMapFn.apply(segmentAndCloseable.lhs);
QueryRunner<T> runner = factory.createRunner(mappedSegment);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
@ -228,7 +238,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
runner,
segmentAndCloseable.rhs
);
return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
return new Pair<>(mappedSegment.getDataInterval(), runner);
}
catch (RuntimeException e) {
CloseQuietly.close(segmentAndCloseable.rhs);

View File

@ -51,6 +51,7 @@ import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.Sink;
@ -101,6 +102,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
private final Map<String, DatasourceBundle> datasourceBundles = new HashMap<>();
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final WorkerConfig workerConfig;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -114,6 +116,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
@Inject
public UnifiedIndexerAppenderatorsManager(
@Processing ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
WorkerConfig workerConfig,
Cache cache,
CacheConfig cacheConfig,
@ -124,6 +127,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
)
{
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.workerConfig = workerConfig;
this.cache = cache;
this.cacheConfig = cacheConfig;
@ -151,6 +155,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@ -320,6 +325,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
serviceEmitter,
queryRunnerFactoryConglomerateProvider.get(),
queryExecutorService,
joinableFactory,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats

View File

@ -36,6 +36,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
@ -70,6 +71,7 @@ public class FlushingPlumber extends RealtimePlumber
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
@ -87,6 +89,7 @@ public class FlushingPlumber extends RealtimePlumber
conglomerate,
segmentAnnouncer,
queryExecutorService,
joinableFactory,
null,
null,
null,

View File

@ -34,6 +34,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
@ -54,6 +55,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
@ -68,6 +70,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@ -84,6 +87,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null,
null,
queryExecutorService,
joinableFactory,
indexMergerV9,
indexIO,
cache,
@ -97,6 +101,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
@ -123,6 +128,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
conglomerate,
segmentAnnouncer,
queryExecutorService,
joinableFactory,
indexMergerV9,
indexIO,
cache,

View File

@ -62,6 +62,7 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
@ -138,6 +139,7 @@ public class RealtimePlumber implements Plumber
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
SegmentHandoffNotifier handoffNotifier,
@ -167,6 +169,7 @@ public class RealtimePlumber implements Plumber
emitter,
conglomerate,
queryExecutorService,
joinableFactory,
cache,
cacheConfig,
cachePopulatorStats

View File

@ -33,6 +33,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentPublisher;
@ -41,6 +42,7 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import java.util.concurrent.ExecutorService;
/**
*
*/
public class RealtimePlumberSchool implements PlumberSchool
{
@ -51,6 +53,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentPublisher segmentPublisher;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
@ -67,6 +70,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@ -82,6 +86,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentPublisher = segmentPublisher;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryExecutorService = executorService;
this.joinableFactory = joinableFactory;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@ -108,6 +113,7 @@ public class RealtimePlumberSchool implements PlumberSchool
conglomerate,
segmentAnnouncer,
queryExecutorService,
joinableFactory,
dataSegmentPusher,
segmentPublisher,
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),

View File

@ -52,7 +52,11 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCounter;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
@ -66,6 +70,7 @@ import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* Query handler for Historical processes (see CliHistorical).
@ -81,6 +86,7 @@ public class ServerManager implements QuerySegmentWalker
private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig;
private final SegmentManager segmentManager;
private final JoinableFactory joinableFactory;
private final ServerConfig serverConfig;
@Inject
@ -93,6 +99,7 @@ public class ServerManager implements QuerySegmentWalker
Cache cache,
CacheConfig cacheConfig,
SegmentManager segmentManager,
JoinableFactory joinableFactory,
ServerConfig serverConfig
)
{
@ -106,6 +113,7 @@ public class ServerManager implements QuerySegmentWalker
this.cacheConfig = cacheConfig;
this.segmentManager = segmentManager;
this.joinableFactory = joinableFactory;
this.serverConfig = serverConfig;
}
@ -113,10 +121,6 @@ public class ServerManager implements QuerySegmentWalker
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline =
segmentManager.getTimeline(analysis);
@ -165,13 +169,8 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<>();
}
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
@ -191,6 +190,13 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<>();
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuTimeAccumulator
);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)
.transformCat(
@ -214,7 +220,8 @@ public class ServerManager implements QuerySegmentWalker
buildAndDecorateQueryRunner(
factory,
toolChest,
segment,
segmentMapFn.apply(segment),
segment.referenceCounter(),
descriptor,
cpuTimeAccumulator
)
@ -237,19 +244,20 @@ public class ServerManager implements QuerySegmentWalker
private <T> QueryRunner<T> buildAndDecorateQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
final ReferenceCountingSegment adapter,
final Segment segment,
final ReferenceCounter segmentReferenceCounter,
final SegmentDescriptor segmentDescriptor,
final AtomicLong cpuTimeAccumulator
)
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
SegmentId segmentId = adapter.getId();
SegmentId segmentId = segment.getId();
String segmentIdString = segmentId.toString();
MetricsEmittingQueryRunner<T> metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>(
emitter,
toolChest,
new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor),
new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentReferenceCounter, segmentDescriptor),
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(segmentIdString)
);
@ -267,7 +275,7 @@ public class ServerManager implements QuerySegmentWalker
BySegmentQueryRunner<T> bySegmentQueryRunner = new BySegmentQueryRunner<>(
segmentId,
adapter.getDataInterval().getStart(),
segment.getDataInterval().getStart(),
cachingQueryRunner
);

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import java.util.Optional;
public class InlineJoinableFactoryTest
{
private static final String PREFIX = "j.";
private final InlineJoinableFactory factory = new InlineJoinableFactory();
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of("str", "long"),
ImmutableList.of(ValueType.STRING, ValueType.LONG),
ImmutableList.of(
new Object[]{"foo", 1L},
new Object[]{"bar", 2L}
)
);
@Test
public void testBuildNonInline()
{
Assert.assertEquals(
Optional.empty(),
factory.build(new TableDataSource("foo"), makeCondition("x == \"j.y\""))
);
}
@Test
public void testBuildNonHashJoin()
{
Assert.assertEquals(
Optional.empty(),
factory.build(inlineDataSource, makeCondition("x > \"j.y\""))
);
}
@Test
public void testBuild()
{
final Joinable joinable = factory.build(inlineDataSource, makeCondition("x == \"j.long\"")).get();
Assert.assertThat(joinable, CoreMatchers.instanceOf(IndexedTableJoinable.class));
Assert.assertEquals(ImmutableList.of("long", "str"), joinable.getAvailableColumns());
Assert.assertEquals(2, joinable.getCardinality("str"));
Assert.assertEquals(2, joinable.getCardinality("long"));
}
private static JoinConditionAnalysis makeCondition(final String condition)
{
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
}
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.junit.Assert;
@ -112,6 +113,7 @@ public class FireDepartmentTest
null,
null,
null,
NoopJoinableFactory.INSTANCE,
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
TestHelper.getTestIndexIO(),
MapCache.create(0),

View File

@ -57,6 +57,7 @@ import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@ -281,6 +282,7 @@ public class AppenderatorTester implements AutoCloseable
},
emitter,
queryExecutor,
NoopJoinableFactory.INSTANCE,
MapCache.create(2048),
new CacheConfig(),
new CachePopulatorStats()

View File

@ -50,6 +50,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireDepartmentTest;
@ -225,6 +226,7 @@ public class RealtimePlumberSchoolTest
segmentPublisher,
handoffNotifierFactory,
Execs.directExecutor(),
NoopJoinableFactory.INSTANCE,
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory),
TestHelper.getTestIndexIO(),
MapCache.create(0),

View File

@ -63,6 +63,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
@ -158,6 +159,7 @@ public class ServerManagerTest
new LocalCacheProvider().get(),
new CacheConfig(),
segmentManager,
NoopJoinableFactory.INSTANCE,
new ServerConfig()
);

View File

@ -41,6 +41,8 @@ import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.join.DefaultJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerManager;
@ -56,8 +58,6 @@ import org.eclipse.jetty.server.Server;
import java.util.List;
/**
*/
@Command(
name = "historical",
description = "Runs a Historical node, see https://druid.apache.org/docs/latest/Historical.html for a description"
@ -90,6 +90,7 @@ public class CliHistorical extends ServerRunnable
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.HISTORICAL));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);

View File

@ -59,6 +59,8 @@ import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.join.DefaultJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
@ -119,6 +121,7 @@ public class CliIndexer extends ServerRunnable
binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class);
binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class);
binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class);
CliPeon.bindRowIngestionMeters(binder);

View File

@ -91,6 +91,8 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.join.DefaultJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
@ -207,6 +209,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
bindRealtimeCache(binder);