From 19b427e8f371bed0b0100272ac890b3775471654 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 24 Jan 2020 13:10:01 -0800 Subject: [PATCH] Add JoinableFactory interface and use it in the query stack. (#9247) * Add JoinableFactory interface and use it in the query stack. Also includes InlineJoinableFactory, which enables joining against inline datasources. This is the first patch where a basic join query actually works. It includes integration tests. * Fix test issues. * Adjustments from code review. --- .../java/org/apache/druid/utils/JvmUtils.java | 20 ++- .../indexing/kafka/KafkaIndexTaskTest.java | 2 + .../kinesis/KinesisIndexTaskTest.java | 2 + .../druid/indexing/common/TaskToolbox.java | 9 + .../indexing/common/TaskToolboxFactory.java | 5 + .../AppenderatorDriverRealtimeIndexTask.java | 1 + .../common/task/RealtimeIndexTask.java | 1 + .../SeekableStreamIndexTask.java | 1 + .../indexing/common/TaskToolboxTest.java | 2 + ...penderatorDriverRealtimeIndexTaskTest.java | 2 + .../common/task/CompactionTaskRunTest.java | 2 + .../common/task/CompactionTaskTest.java | 2 + .../common/task/IngestionTestBase.java | 2 + .../common/task/RealtimeIndexTaskTest.java | 2 + .../common/task/TestAppenderatorsManager.java | 3 + ...stractParallelIndexSupervisorTaskTest.java | 2 + .../SingleTaskBackgroundRunnerTest.java | 2 + .../indexing/overlord/TaskLifecycleTest.java | 3 + .../worker/WorkerTaskManagerTest.java | 2 + .../worker/WorkerTaskMonitorTest.java | 14 +- integration-tests/docker/broker.conf | 1 + integration-tests/docker/historical.conf | 1 + .../indexer/kafka_index_queries.json | 52 ++++++ .../wikipedia_realtime_index_queries.json | 60 +++++++ .../queries/wikipedia_editstream_queries.json | 160 ++++++++++++++++++ .../ReferenceCountingSegmentQueryRunner.java | 22 +-- .../query/planning/PreJoinableClause.java | 2 +- .../druid/segment/ReferenceCounter.java | 44 +++++ .../segment/ReferenceCountingSegment.java | 12 +- .../druid/segment/join/HashJoinSegment.java | 13 +- .../segment/join/JoinConditionAnalysis.java | 20 ++- .../apache/druid/segment/join/Joinable.java | 4 + .../druid/segment/join/JoinableClause.java | 4 +- .../druid/segment/join/JoinableFactory.java | 40 +++++ .../apache/druid/segment/join/Joinables.java | 61 +++++++ .../IndexedTableColumnSelectorFactory.java | 2 +- .../join/table/RowBasedIndexedTable.java | 5 + .../segment/join/HashJoinSegmentTest.java | 58 +++---- .../druid/segment/join/JoinablesTest.java | 78 +++++++++ .../segment/join/NoopJoinableFactory.java | 40 +++++ .../join/table/IndexedTableJoinableTest.java | 147 ++++++++++++++++ .../segment/join/DefaultJoinableFactory.java | 52 ++++++ .../segment/join/InlineJoinableFactory.java | 59 +++++++ .../realtime/appenderator/Appenderators.java | 3 + .../appenderator/AppenderatorsManager.java | 2 + .../DefaultRealtimeAppenderatorFactory.java | 5 + ...DummyForInjectionAppenderatorsManager.java | 2 + .../PeonAppenderatorsManager.java | 3 + .../appenderator/SinkQuerySegmentWalker.java | 24 ++- .../UnifiedIndexerAppenderatorsManager.java | 6 + .../realtime/plumber/FlushingPlumber.java | 3 + .../plumber/FlushingPlumberSchool.java | 6 + .../realtime/plumber/RealtimePlumber.java | 3 + .../plumber/RealtimePlumberSchool.java | 6 + .../server/coordination/ServerManager.java | 38 +++-- .../join/InlineJoinableFactoryTest.java | 82 +++++++++ .../segment/realtime/FireDepartmentTest.java | 2 + .../appenderator/AppenderatorTester.java | 2 + .../plumber/RealtimePlumberSchoolTest.java | 2 + .../coordination/ServerManagerTest.java | 2 + .../org/apache/druid/cli/CliHistorical.java | 5 +- .../java/org/apache/druid/cli/CliIndexer.java | 3 + .../java/org/apache/druid/cli/CliPeon.java | 3 + 63 files changed, 1121 insertions(+), 97 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java create mode 100644 processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java create mode 100644 processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java create mode 100644 processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java create mode 100644 server/src/main/java/org/apache/druid/segment/join/DefaultJoinableFactory.java create mode 100644 server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java create mode 100644 server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java diff --git a/core/src/main/java/org/apache/druid/utils/JvmUtils.java b/core/src/main/java/org/apache/druid/utils/JvmUtils.java index 7211545c4e2..5c4869d4118 100644 --- a/core/src/main/java/org/apache/druid/utils/JvmUtils.java +++ b/core/src/main/java/org/apache/druid/utils/JvmUtils.java @@ -29,6 +29,8 @@ import java.net.URL; import java.nio.file.Paths; import java.util.List; import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -80,13 +82,29 @@ public class JvmUtils * @return total CPU time for the current thread in nanoseconds. * * @throws UnsupportedOperationException if the Java virtual machine does not support CPU time measurement for - * the current thread. + * the current thread. */ public static long getCurrentThreadCpuTime() { return THREAD_MX_BEAN.getCurrentThreadCpuTime(); } + /** + * Executes and returns the value of {@code function}. Also accumulates the CPU time taken for the function (as + * reported by {@link #getCurrentThreadCpuTime()} into {@param accumulator}. + */ + public static T safeAccumulateThreadCpuTime(final AtomicLong accumulator, final Supplier function) + { + final long start = safeGetThreadCpuTime(); + + try { + return function.get(); + } + finally { + accumulator.addAndGet(safeGetThreadCpuTime() - start); + } + } + public static List systemClassPath() { List jobURLs; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 6bc4991375d..43253b067e1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -105,6 +105,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; @@ -2652,6 +2653,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase handoffNotifierFactory, this::makeTimeseriesAndScanConglomerate, Execs.directExecutor(), // queryExecutorService + NoopJoinableFactory.INSTANCE, EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 43b4f06c9c6..aef7accc709 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -97,6 +97,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; @@ -2888,6 +2889,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase handoffNotifierFactory, this::makeTimeseriesOnlyConglomerate, Execs.directExecutor(), // queryExecutorService + NoopJoinableFactory.INSTANCE, EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 812679df9c9..40e8ad98272 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -42,6 +42,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentArchiver; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; @@ -86,6 +87,7 @@ public class TaskToolbox private final Provider queryRunnerFactoryConglomerateProvider; private final MonitorScheduler monitorScheduler; private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final SegmentLoader segmentLoader; private final ObjectMapper jsonMapper; private final File taskWorkDir; @@ -116,6 +118,7 @@ public class TaskToolbox SegmentHandoffNotifierFactory handoffNotifierFactory, Provider queryRunnerFactoryConglomerateProvider, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, MonitorScheduler monitorScheduler, SegmentLoader segmentLoader, ObjectMapper jsonMapper, @@ -146,6 +149,7 @@ public class TaskToolbox this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider; this.queryExecutorService = queryExecutorService; + this.joinableFactory = joinableFactory; this.monitorScheduler = monitorScheduler; this.segmentLoader = segmentLoader; this.jsonMapper = jsonMapper; @@ -229,6 +233,11 @@ public class TaskToolbox return queryExecutorService; } + public JoinableFactory getJoinableFactory() + { + return joinableFactory; + } + public MonitorScheduler getMonitorScheduler() { return monitorScheduler; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index e1c04a93c1b..d70099e2e03 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -42,6 +42,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentArchiver; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; @@ -72,6 +73,7 @@ public class TaskToolboxFactory private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final Provider queryRunnerFactoryConglomerateProvider; private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final MonitorScheduler monitorScheduler; private final SegmentLoaderFactory segmentLoaderFactory; private final ObjectMapper jsonMapper; @@ -102,6 +104,7 @@ public class TaskToolboxFactory SegmentHandoffNotifierFactory handoffNotifierFactory, Provider queryRunnerFactoryConglomerateProvider, @Processing ExecutorService queryExecutorService, + JoinableFactory joinableFactory, MonitorScheduler monitorScheduler, SegmentLoaderFactory segmentLoaderFactory, @Json ObjectMapper jsonMapper, @@ -131,6 +134,7 @@ public class TaskToolboxFactory this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider; this.queryExecutorService = queryExecutorService; + this.joinableFactory = joinableFactory; this.monitorScheduler = monitorScheduler; this.segmentLoaderFactory = segmentLoaderFactory; this.jsonMapper = jsonMapper; @@ -164,6 +168,7 @@ public class TaskToolboxFactory handoffNotifierFactory, queryRunnerFactoryConglomerateProvider, queryExecutorService, + joinableFactory, monitorScheduler, segmentLoaderFactory.manufacturate(taskWorkDir), jsonMapper, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index b7db37a5bf6..daaec1ba5fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -772,6 +772,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements toolbox.getSegmentAnnouncer(), toolbox.getEmitter(), toolbox.getQueryExecutorService(), + toolbox.getJoinableFactory(), toolbox.getCache(), toolbox.getCacheConfig(), toolbox.getCachePopulatorStats() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 31c5799f3eb..ed2ddd2c604 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -341,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask segmentPublisher, toolbox.getSegmentHandoffNotifierFactory(), toolbox.getQueryExecutorService(), + toolbox.getJoinableFactory(), toolbox.getIndexMergerV9(), toolbox.getIndexIO(), toolbox.getCache(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 8c790f2112d..2fac4f50001 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -206,6 +206,7 @@ public abstract class SeekableStreamIndexTask mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, + NoopJoinableFactory.INSTANCE, mockMonitorScheduler, mockSegmentLoaderFactory, ObjectMapper, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 969f7695370..de3fa29a489 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -113,6 +113,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; @@ -1591,6 +1592,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest handoffNotifierFactory, () -> conglomerate, Execs.directExecutor(), // queryExecutorService + NoopJoinableFactory.INSTANCE, EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 3a18cac2b58..779113fc3d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -61,6 +61,7 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; @@ -877,6 +878,7 @@ public class CompactionTaskRunTest extends IngestionTestBase null, null, null, + NoopJoinableFactory.INSTANCE, null, loader, objectMapper, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 1289944fa2d..caad2db871b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -108,6 +108,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -1274,6 +1275,7 @@ public class CompactionTaskTest null, null, null, + NoopJoinableFactory.INSTANCE, null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index e87e0f79617..b3ecce4fab0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -56,6 +56,7 @@ import org.apache.druid.metadata.SQLMetadataSegmentManager; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; @@ -306,6 +307,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest null, null, null, + NoopJoinableFactory.INSTANCE, null, null, objectMapper, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index e3967e16cb1..6437488b1f1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -100,6 +100,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.FireDepartment; @@ -973,6 +974,7 @@ public class RealtimeIndexTaskTest handoffNotifierFactory, () -> conglomerate, Execs.directExecutor(), // queryExecutorService + NoopJoinableFactory.INSTANCE, EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index 1bffbd86a4d..8d1ab376826 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -31,6 +31,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.appenderator.Appenderator; @@ -60,6 +61,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -78,6 +80,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager segmentAnnouncer, emitter, queryExecutorService, + joinableFactory, cache, cacheConfig, cachePopulatorStats diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 480522bc33b..d0aeede0810 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; @@ -301,6 +302,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, null, + NoopJoinableFactory.INSTANCE, null, newSegmentLoader(temporaryFolder.newFolder()), getObjectMapper(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index eb7c6de297b..4cfa87d8a36 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.NoopDataSegmentArchiver; import org.apache.druid.segment.loading.NoopDataSegmentKiller; import org.apache.druid.segment.loading.NoopDataSegmentMover; @@ -94,6 +95,7 @@ public class SingleTaskBackgroundRunnerTest null, null, null, + NoopJoinableFactory.INSTANCE, null, new SegmentLoaderFactory(null, utils.getTestObjectMapper()), utils.getTestObjectMapper(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 35c3a8060e0..3dce1974e7e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -117,6 +117,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentArchiver; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -663,6 +664,7 @@ public class TaskLifecycleTest handoffNotifierFactory, () -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective Execs.directExecutor(), // query executor service + NoopJoinableFactory.INSTANCE, monitorScheduler, // monitor scheduler new SegmentLoaderFactory(null, new DefaultObjectMapper()), MAPPER, @@ -1329,6 +1331,7 @@ public class TaskLifecycleTest UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager( exec, + NoopJoinableFactory.INSTANCE, new WorkerConfig(), MapCache.create(2048), new CacheConfig(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index c31b0b64bc0..3ae7d96e5da 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.TestTaskRunner; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; @@ -120,6 +121,7 @@ public class WorkerTaskManagerTest notifierFactory, null, null, + NoopJoinableFactory.INSTANCE, null, new SegmentLoaderFactory(null, jsonMapper), jsonMapper, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 8412eb0f693..2fdca5f2c33 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.IndexerZkConfig; @@ -170,7 +171,18 @@ public class WorkerTaskMonitorTest taskConfig, null, taskActionClientFactory, - null, null, null, null, null, null, null, notifierFactory, null, null, null, + null, + null, + null, + null, + null, + null, + null, + notifierFactory, + null, + null, + NoopJoinableFactory.INSTANCE, + null, new SegmentLoaderFactory(null, jsonMapper), jsonMapper, indexIO, diff --git a/integration-tests/docker/broker.conf b/integration-tests/docker/broker.conf index aa4e83f2cda..b602f861c67 100644 --- a/integration-tests/docker/broker.conf +++ b/integration-tests/docker/broker.conf @@ -11,6 +11,7 @@ command=java -Ddruid.host=%(ENV_HOST_IP)s -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.processing.buffer.sizeBytes=25000000 + -Ddruid.query.groupBy.maxOnDiskStorage=300000000 -Ddruid.server.http.numThreads=40 -Ddruid.processing.numThreads=1 -Ddruid.broker.http.numConnections=20 diff --git a/integration-tests/docker/historical.conf b/integration-tests/docker/historical.conf index 04dcbf4c950..770610e524a 100644 --- a/integration-tests/docker/historical.conf +++ b/integration-tests/docker/historical.conf @@ -14,6 +14,7 @@ command=java -Ddruid.s3.secretKey=OBaLISDFjKLajSTrJ53JoTtzTZLjPlRePcwa+Pjv -Ddruid.processing.buffer.sizeBytes=25000000 -Ddruid.processing.numThreads=2 + -Ddruid.query.groupBy.maxOnDiskStorage=300000000 -Ddruid.server.http.numThreads=20 -Ddruid.segmentCache.locations="[{\"path\":\"/shared/druid/indexCache\",\"maxSize\":5000000000}]" -Ddruid.server.maxSize=5000000000 diff --git a/integration-tests/src/test/resources/indexer/kafka_index_queries.json b/integration-tests/src/test/resources/indexer/kafka_index_queries.json index 73876c53595..d31d0d14493 100644 --- a/integration-tests/src/test/resources/indexer/kafka_index_queries.json +++ b/integration-tests/src/test/resources/indexer/kafka_index_queries.json @@ -36,5 +36,57 @@ } } ] + }, + { + "description": "topN, 1 agg, join to inline", + "query": { + "queryType": "topN", + "dataSource": { + "type": "join", + "left": "%%DATASOURCE%%", + "right": { + "type": "inline", + "columnNames": ["language", "lookupLanguage"], + "columnTypes": ["string", "string"], + "rows": [ + ["en", "inline join!"] + ] + }, + "rightPrefix": "j.", + "condition": "language == \"j.language\"", + "joinType": "LEFT" + }, + "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], + "granularity": "all", + "virtualColumns": [ + { + "type": "expression", + "name": "lookupLanguage", + "expression": "nvl(\"j.lookupLanguage\", \"language\")", + "outputType": "string" + } + ], + "aggregations": [ + { + "type": "longSum", + "name": "count", + "fieldName": "count" + } + ], + "dimension": "lookupLanguage", + "metric": "count", + "threshold": 3 + }, + "expectedResults": [ + { + "timestamp": "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + "result": [ + { + "lookupLanguage": "inline join!", + "count": %%TIMESERIES_NUMEVENTS%% + } + ] + } + ] } ] diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json index bb6759595e0..e5fe33b6b15 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json @@ -83,5 +83,65 @@ "rows" : 1 } } ] + }, + { + "description": "topN, 1 agg, join to inline", + "query": { + "queryType": "topN", + "dataSource": { + "type": "join", + "left": "%%DATASOURCE%%", + "right": { + "type": "inline", + "columnNames": ["language", "lookupLanguage"], + "columnTypes": ["string", "string"], + "rows": [ + ["en", "inline join!"] + ] + }, + "rightPrefix": "j.", + "condition": "language == \"j.language\"", + "joinType": "LEFT" + }, + "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], + "granularity": "all", + "virtualColumns": [ + { + "type": "expression", + "name": "lookupLanguage", + "expression": "nvl(\"j.lookupLanguage\", \"language\")", + "outputType": "string" + } + ], + "aggregations": [ + { + "type": "longSum", + "name": "count", + "fieldName": "count" + } + ], + "dimension": "lookupLanguage", + "metric": "count", + "threshold": 3 + }, + "expectedResults": [ + { + "timestamp": "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + "result": [ + { + "lookupLanguage": "inline join!", + "count": 14 + }, + { + "lookupLanguage": "ja", + "count": 3 + }, + { + "lookupLanguage": "ru", + "count": 3 + } + ] + } + ] } ] diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index 846fcd2ae35..9c6560da124 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -1299,5 +1299,165 @@ ] } ] + }, + { + "description": "topN, 1 agg, join to inline", + "query": { + "queryType": "topN", + "dataSource": { + "type": "join", + "left": "wikipedia_editstream", + "right": { + "type": "inline", + "columnNames": ["page", "lookupPage"], + "columnTypes": ["string", "string"], + "rows": [ + ["Wikipedia:Vandalismusmeldung", "inline join!"] + ] + }, + "rightPrefix": "j.", + "condition": "page == \"j.page\"", + "joinType": "LEFT" + }, + "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"], + "granularity": "all", + "virtualColumns": [ + { + "type": "expression", + "name": "lookupPage", + "expression": "nvl(\"j.lookupPage\", \"page\")", + "outputType": "string" + } + ], + "aggregations": [ + { + "type": "count", + "name": "rows" + } + ], + "dimension": "lookupPage", + "metric": "rows", + "threshold": 3, + "context": { + "useCache": "true", + "populateCache": "true", + "timeout": 360000 + } + }, + "expectedResults": [ + { + "timestamp": "2013-01-01T00:00:00.000Z", + "result": [ + { + "lookupPage": "inline join!", + "rows": 991 + }, + { + "lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents", + "rows": 990 + }, + { + "lookupPage": "Wikipedia:Administrator_intervention_against_vandalism", + "rows": 800 + } + ] + } + ] + }, + { + "description": "groupBy, 1 agg, subquery over join to inline", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "join", + "left": "wikipedia_editstream", + "right": { + "type": "inline", + "columnNames": ["page", "lookupPage"], + "columnTypes": ["string", "string"], + "rows": [ + ["Wikipedia:Vandalismusmeldung", "inline join!"] + ] + }, + "rightPrefix": "j.", + "condition": "page == \"j.page\"", + "joinType": "LEFT" + }, + "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"], + "granularity": "all", + "virtualColumns": [ + { + "type": "expression", + "name": "lookupPage", + "expression": "nvl(\"j.lookupPage\", \"page\")", + "outputType": "string" + } + ], + "aggregations": [ + { + "type": "count", + "name": "rows" + } + ], + "dimensions": ["lookupPage"] + } + }, + "intervals": ["2013-01-01T00:00:00.000/2013-01-02T00:00:00.000"], + "granularity": "all", + "aggregations": [ + { + "type": "longSum", + "name": "rows_outer", + "fieldName": "rows" + } + ], + "dimensions": ["lookupPage"], + "limitSpec": { + "type": "default", + "columns": [ + { + "dimension": "rows_outer", + "dimensionOrder": "numeric", + "direction": "descending" + } + ], + "limit": 3 + }, + "context": { + "useCache": "true", + "populateCache": "true", + "timeout": 360000 + } + }, + "expectedResults": [ + { + "version": "v1", + "timestamp": "2013-01-01T00:00:00.000Z", + "event": { + "lookupPage": "inline join!", + "rows_outer": 991 + } + }, + { + "version": "v1", + "timestamp": "2013-01-01T00:00:00.000Z", + "event": { + "lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents", + "rows_outer": 990 + } + }, + { + "version": "v1", + "timestamp": "2013-01-01T00:00:00.000Z", + "event": { + "lookupPage": "Wikipedia:Administrator_intervention_against_vandalism", + "rows_outer": 800 + } + } + ] } ] diff --git a/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java index ade35759184..8691441ec20 100644 --- a/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -22,39 +22,41 @@ package org.apache.druid.query; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.ReferenceCounter; +import org.apache.druid.segment.Segment; -/** - */ public class ReferenceCountingSegmentQueryRunner implements QueryRunner { private final QueryRunnerFactory> factory; - private final ReferenceCountingSegment adapter; + private final Segment segment; + private final ReferenceCounter segmentReferenceCounter; private final SegmentDescriptor descriptor; public ReferenceCountingSegmentQueryRunner( QueryRunnerFactory> factory, - ReferenceCountingSegment adapter, + Segment segment, + ReferenceCounter segmentReferenceCounter, SegmentDescriptor descriptor ) { this.factory = factory; - this.adapter = adapter; + this.segment = segment; + this.segmentReferenceCounter = segmentReferenceCounter; this.descriptor = descriptor; } @Override public Sequence run(final QueryPlus queryPlus, ResponseContext responseContext) { - if (adapter.increment()) { + if (segmentReferenceCounter.increment()) { try { - final Sequence baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext); + final Sequence baseSequence = factory.createRunner(segment).run(queryPlus, responseContext); - return Sequences.withBaggage(baseSequence, adapter.decrementOnceCloseable()); + return Sequences.withBaggage(baseSequence, segmentReferenceCounter.decrementOnceCloseable()); } catch (Throwable t) { try { - adapter.decrement(); + segmentReferenceCounter.decrement(); } catch (Exception e) { t.addSuppressed(e); diff --git a/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java b/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java index 5ed7f71d561..ddec7540322 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java +++ b/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java @@ -39,7 +39,7 @@ public class PreJoinableClause private final JoinType joinType; private final JoinConditionAnalysis condition; - PreJoinableClause( + public PreJoinableClause( final String prefix, final DataSource dataSource, final JoinType joinType, diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java new file mode 100644 index 00000000000..970d487b770 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import java.io.Closeable; + +/** + * An interface to reference-counted objects. Used by {@link ReferenceCountingSegment}. Thread-safe. + */ +public interface ReferenceCounter +{ + /** + * Increment the reference count by one. + */ + boolean increment(); + + /** + * Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the + * returned Closeable object for the second time, it won't call {@link #decrement()} again. + */ + Closeable decrementOnceCloseable(); + + /** + * Decrement the reference count by one. + */ + void decrement(); +} diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java index 673ab20b06d..ba1c34318e3 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java @@ -27,7 +27,6 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; import javax.annotation.Nullable; - import java.io.Closeable; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,7 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean; * until that. So ReferenceCountingSegment implements something like automatic reference count-based resource * management. */ -public class ReferenceCountingSegment extends AbstractSegment implements Overshadowable +public class ReferenceCountingSegment extends AbstractSegment + implements Overshadowable, ReferenceCounter { private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class); @@ -167,6 +167,12 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha } } + public ReferenceCounter referenceCounter() + { + return this; + } + + @Override public boolean increment() { // Negative return from referents.register() means the Phaser is terminated. @@ -177,6 +183,7 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha * Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the * returned Closeable object for the second time, it won't call {@link #decrement()} again. */ + @Override public Closeable decrementOnceCloseable() { AtomicBoolean decremented = new AtomicBoolean(false); @@ -189,6 +196,7 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha }; } + @Override public void decrement() { referents.arriveAndDeregister(); diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index e35f550577d..ef5ba7a34a0 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -24,7 +24,6 @@ import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; @@ -50,15 +49,9 @@ public class HashJoinSegment extends AbstractSegment this.baseSegment = baseSegment; this.clauses = clauses; - // Verify no clauses would shadow the special __time field. - for (JoinableClause clause : clauses) { - if (clause.includesColumn(ColumnHolder.TIME_COLUMN_NAME)) { - throw new IAE( - "Clause cannot have prefix[%s], since it would shadow %s", - clause.getPrefix(), - ColumnHolder.TIME_COLUMN_NAME - ); - } + // Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know) + if (clauses.isEmpty()) { + throw new IAE("'clauses' is empty, no need to create HashJoinSegment"); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java b/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java index 3733a1b2727..a9e9f21bfae 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java @@ -49,6 +49,7 @@ import java.util.Optional; public class JoinConditionAnalysis { private final String originalExpression; + private final String rightPrefix; private final List equiConditions; private final List nonEquiConditions; private final boolean isAlwaysFalse; @@ -57,19 +58,23 @@ public class JoinConditionAnalysis private JoinConditionAnalysis( final String originalExpression, + final String rightPrefix, final List equiConditions, final List nonEquiConditions ) { this.originalExpression = Preconditions.checkNotNull(originalExpression, "originalExpression"); + this.rightPrefix = Preconditions.checkNotNull(rightPrefix, "rightPrefix"); this.equiConditions = Collections.unmodifiableList(equiConditions); this.nonEquiConditions = Collections.unmodifiableList(nonEquiConditions); // if any nonEquiCondition is an expression and it evaluates to false isAlwaysFalse = nonEquiConditions.stream() - .anyMatch(expr -> expr.isLiteral() && !expr.eval(ExprUtils.nilBindings()).asBoolean()); + .anyMatch(expr -> expr.isLiteral() && !expr.eval(ExprUtils.nilBindings()) + .asBoolean()); // if there are no equiConditions and all nonEquiConditions are literals and the evaluate to true isAlwaysTrue = equiConditions.isEmpty() && nonEquiConditions.stream() - .allMatch(expr -> expr.isLiteral() && expr.eval(ExprUtils.nilBindings()).asBoolean()); + .allMatch(expr -> expr.isLiteral() && expr.eval( + ExprUtils.nilBindings()).asBoolean()); canHashJoin = nonEquiConditions.stream().allMatch(Expr::isLiteral); } @@ -113,14 +118,14 @@ public class JoinConditionAnalysis } } - return new JoinConditionAnalysis(condition, equiConditions, nonEquiConditions); + return new JoinConditionAnalysis(condition, rightPrefix, equiConditions, nonEquiConditions); } private static boolean isLeftExprAndRightColumn(final Expr a, final Expr b, final String rightPrefix) { - return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> c.startsWith(rightPrefix)) + return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> Joinables.isPrefixedBy(c, rightPrefix)) && b.getIdentifierIfIdentifier() != null - && b.getIdentifierIfIdentifier().startsWith(rightPrefix); + && Joinables.isPrefixedBy(b.getIdentifierIfIdentifier(), rightPrefix); } /** @@ -181,13 +186,14 @@ public class JoinConditionAnalysis return false; } JoinConditionAnalysis that = (JoinConditionAnalysis) o; - return Objects.equals(originalExpression, that.originalExpression); + return Objects.equals(originalExpression, that.originalExpression) && + Objects.equals(rightPrefix, that.rightPrefix); } @Override public int hashCode() { - return Objects.hash(originalExpression); + return Objects.hash(originalExpression, rightPrefix); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java index bc7b6ad678c..b516afbc6cf 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java @@ -43,6 +43,10 @@ public interface Joinable /** * Returns the cardinality of "columnName", or {@link #CARDINALITY_UNKNOWN} if not known. May be used at query * time to trigger optimizations. + * + * If not {@link #CARDINALITY_UNKNOWN}, this must match the cardinality of selectors returned by the + * {@link ColumnSelectorFactory#makeDimensionSelector} method of this joinable's + * {@link JoinMatcher#getColumnSelectorFactory()} . */ int getCardinality(String columnName); diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java index 5e9bfc39bdd..a2ddefe1bf0 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java @@ -30,6 +30,8 @@ import java.util.stream.Collectors; * Represents everything about a join clause except for the left-hand datasource. In other words, if the full join * clause is "t1 JOIN t2 ON t1.x = t2.x" then this class represents "JOIN t2 ON x = t2.x" -- it does not include * references to the left-hand "t1". + * + * Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link Joinables#createSegmentMapFn}. */ public class JoinableClause { @@ -101,7 +103,7 @@ public class JoinableClause */ public boolean includesColumn(final String columnName) { - return columnName.startsWith(prefix) && columnName.length() > prefix.length(); + return Joinables.isPrefixedBy(columnName, prefix); } /** diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java new file mode 100644 index 00000000000..5016d75a79c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.query.DataSource; + +import java.util.Optional; + +/** + * Utility for creating {@link Joinable} objects. + */ +public interface JoinableFactory +{ + /** + * Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc. + * + * @param dataSource the datasource to join on + * @param condition the condition to join on + * + * @return a Joinable if this datasource + condition combo is joinable; empty if not + */ + Optional build(DataSource dataSource, JoinConditionAnalysis condition); +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinables.java b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java index 6bb95a1a502..ea019b47d28 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinables.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java @@ -20,9 +20,18 @@ package org.apache.druid.segment.join; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.planning.PreJoinableClause; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Utility methods for working with {@link Joinable} related classes. @@ -52,4 +61,56 @@ public class Joinables { return columnName.startsWith(prefix) && columnName.length() > prefix.length(); } + + /** + * Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join + * clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}. + * + * @param clauses pre-joinable clauses + * @param joinableFactory factory for joinables + * @param cpuTimeAccumulator an accumulator that we will add CPU nanos to; this is part of the function to encourage + * callers to remember to track metrics on CPU time required for creation of Joinables + */ + public static Function createSegmentMapFn( + final List clauses, + final JoinableFactory joinableFactory, + final AtomicLong cpuTimeAccumulator + ) + { + return JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> { + if (clauses.isEmpty()) { + return Function.identity(); + } else { + final List joinableClauses = createJoinableClauses(clauses, joinableFactory); + return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses); + } + } + ); + } + + /** + * Returns a list of {@link JoinableClause} corresponding to a list of {@link PreJoinableClause}. This will call + * {@link JoinableFactory#build} on each one and therefore may be an expensive operation. + */ + private static List createJoinableClauses( + final List clauses, + final JoinableFactory joinableFactory + ) + { + return clauses.stream().map(preJoinableClause -> { + final Optional joinable = joinableFactory.build( + preJoinableClause.getDataSource(), + preJoinableClause.getCondition() + ); + + return new JoinableClause( + preJoinableClause.getPrefix(), + joinable.orElseThrow(() -> new ISE("dataSource is not joinable: %s", preJoinableClause.getDataSource())), + preJoinableClause.getJoinType(), + preJoinableClause.getCondition() + ); + }).collect(Collectors.toList()); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java index 1cacb630e55..e5d72e2e8c6 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java @@ -56,7 +56,7 @@ public class IndexedTableColumnSelectorFactory implements ColumnSelectorFactory capabilities.setDictionaryEncoded(true); } - return capabilities; + return capabilities.setIsComplete(true); } else { return null; } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java index dfb5803d00d..69838238700 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java @@ -30,6 +30,7 @@ import org.apache.druid.segment.column.ValueType; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -63,6 +64,10 @@ public class RowBasedIndexedTable implements IndexedTable this.columnFunctions = columns.stream().map(rowAdapter::columnFunction).collect(Collectors.toList()); this.keyColumns = keyColumns; + if (new HashSet<>(keyColumns).size() != keyColumns.size()) { + throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns); + } + if (!rowSignature.keySet().containsAll(keyColumns)) { throw new ISE( "keyColumns[%s] must all be contained in rowSignature[%s]", diff --git a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java index f0ce5c82e7e..47dbe28d17d 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java @@ -31,6 +31,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import java.io.IOException; @@ -40,9 +41,11 @@ public class HashJoinSegmentTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private QueryableIndexSegment baseSegment; - private HashJoinSegment hashJoinSegmentNoClauses; - private HashJoinSegment hashJoinSegmentManyClauses; + private HashJoinSegment hashJoinSegment; @BeforeClass public static void setUpStatic() @@ -58,12 +61,7 @@ public class HashJoinSegmentTest SegmentId.dummy("facts") ); - hashJoinSegmentNoClauses = new HashJoinSegment( - baseSegment, - ImmutableList.of() - ); - - hashJoinSegmentManyClauses = new HashJoinSegment( + hashJoinSegment = new HashJoinSegment( baseSegment, ImmutableList.of( new JoinableClause( @@ -83,55 +81,37 @@ public class HashJoinSegmentTest } @Test - public void test_getId_noClauses() + public void test_constructor_noClauses() { - Assert.assertEquals(baseSegment.getId(), hashJoinSegmentNoClauses.getId()); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("'clauses' is empty, no need to create HashJoinSegment"); + + final HashJoinSegment ignored = new HashJoinSegment(baseSegment, ImmutableList.of()); } @Test - public void test_getId_manyClauses() + public void test_getId() { - Assert.assertEquals(baseSegment.getId(), hashJoinSegmentManyClauses.getId()); + Assert.assertEquals(baseSegment.getId(), hashJoinSegment.getId()); } @Test - public void test_getDataInterval_noClauses() + public void test_getDataInterval() { - Assert.assertEquals(baseSegment.getDataInterval(), hashJoinSegmentNoClauses.getDataInterval()); + Assert.assertEquals(baseSegment.getDataInterval(), hashJoinSegment.getDataInterval()); } @Test - public void test_getDataInterval_manyClauses() + public void test_asQueryableIndex() { - Assert.assertEquals(baseSegment.getDataInterval(), hashJoinSegmentManyClauses.getDataInterval()); + Assert.assertNull(hashJoinSegment.asQueryableIndex()); } @Test - public void test_asQueryableIndex_noClauses() - { - Assert.assertNull(hashJoinSegmentNoClauses.asQueryableIndex()); - } - - @Test - public void test_asQueryableIndex_manyClauses() - { - Assert.assertNull(hashJoinSegmentManyClauses.asQueryableIndex()); - } - - @Test - public void test_asStorageAdapter_noClauses() + public void test_asStorageAdapter() { Assert.assertThat( - hashJoinSegmentNoClauses.asStorageAdapter(), - CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class) - ); - } - - @Test - public void test_asStorageAdapter_manyClauses() - { - Assert.assertThat( - hashJoinSegmentManyClauses.asStorageAdapter(), + hashJoinSegment.asStorageAdapter(), CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class) ); } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java index ae3f845529e..23a5f13ca41 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -19,12 +19,24 @@ package org.apache.druid.segment.join; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.extraction.MapLookupExtractor; +import org.apache.druid.query.planning.PreJoinableClause; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.join.lookup.LookupJoinable; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + public class JoinablesTest { @Rule @@ -74,4 +86,70 @@ public class JoinablesTest Assert.assertTrue(Joinables.isPrefixedBy("foo", "fo")); Assert.assertFalse(Joinables.isPrefixedBy("foo", "foo")); } + + @Test + public void test_createSegmentMapFn_noClauses() + { + final Function segmentMapFn = Joinables.createSegmentMapFn( + ImmutableList.of(), + NoopJoinableFactory.INSTANCE, + new AtomicLong() + ); + + Assert.assertSame(Function.identity(), segmentMapFn); + } + + @Test + public void test_createSegmentMapFn_unusableClause() + { + final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo"); + final PreJoinableClause clause = new PreJoinableClause( + "j.", + lookupDataSource, + JoinType.LEFT, + JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil()) + ); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("dataSource is not joinable"); + + final Function ignored = Joinables.createSegmentMapFn( + ImmutableList.of(clause), + NoopJoinableFactory.INSTANCE, + new AtomicLong() + ); + } + + @Test + public void test_createSegmentMapFn_usableClause() + { + final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo"); + final JoinConditionAnalysis conditionAnalysis = JoinConditionAnalysis.forExpression( + "x == \"j.x\"", + "j.", + ExprMacroTable.nil() + ); + final PreJoinableClause clause = new PreJoinableClause( + "j.", + lookupDataSource, + JoinType.LEFT, + conditionAnalysis + ); + + final Function segmentMapFn = Joinables.createSegmentMapFn( + ImmutableList.of(clause), + (dataSource, condition) -> { + if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { + return Optional.of( + LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false)) + ); + } else { + return Optional.empty(); + } + }, + new AtomicLong() + ); + + Assert.assertNotSame(Function.identity(), segmentMapFn); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java new file mode 100644 index 00000000000..ff138041f13 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.query.DataSource; + +import java.util.Optional; + +public class NoopJoinableFactory implements JoinableFactory +{ + public static final NoopJoinableFactory INSTANCE = new NoopJoinableFactory(); + + private NoopJoinableFactory() + { + // Singleton. + } + + @Override + public Optional build(DataSource dataSource, JoinConditionAnalysis condition) + { + return Optional.empty(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java new file mode 100644 index 00000000000..ad785c1f08e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.table; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinMatcher; +import org.junit.Assert; +import org.junit.Test; + +public class IndexedTableJoinableTest +{ + private static final String PREFIX = "j."; + + private final ColumnSelectorFactory dummyColumnSelectorFactory = new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + return null; + } + + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + return null; + } + }; + + private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable( + ImmutableList.of("str", "long"), + ImmutableList.of(ValueType.STRING, ValueType.LONG), + ImmutableList.of( + new Object[]{"foo", 1L}, + new Object[]{"bar", 2L} + ) + ); + + private final RowBasedIndexedTable indexedTable = new RowBasedIndexedTable<>( + inlineDataSource.getRowsAsList(), + inlineDataSource.rowAdapter(), + inlineDataSource.getRowSignature(), + ImmutableList.of("str") + ); + + @Test + public void test_getAvailableColumns() + { + final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable); + Assert.assertEquals(ImmutableList.of("long", "str"), joinable.getAvailableColumns()); + } + + @Test + public void test_getColumnCapabilities_string() + { + final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable); + final ColumnCapabilities capabilities = joinable.getColumnCapabilities("str"); + Assert.assertEquals(ValueType.STRING, capabilities.getType()); + Assert.assertTrue(capabilities.isDictionaryEncoded()); + Assert.assertFalse(capabilities.hasBitmapIndexes()); + Assert.assertFalse(capabilities.hasMultipleValues()); + Assert.assertFalse(capabilities.hasSpatialIndexes()); + Assert.assertTrue(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_long() + { + final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable); + final ColumnCapabilities capabilities = joinable.getColumnCapabilities("long"); + Assert.assertEquals(ValueType.LONG, capabilities.getType()); + Assert.assertFalse(capabilities.isDictionaryEncoded()); + Assert.assertFalse(capabilities.hasBitmapIndexes()); + Assert.assertFalse(capabilities.hasMultipleValues()); + Assert.assertFalse(capabilities.hasSpatialIndexes()); + Assert.assertTrue(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_nonexistent() + { + final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable); + final ColumnCapabilities capabilities = joinable.getColumnCapabilities("nonexistent"); + Assert.assertNull(capabilities); + } + + @Test + public void test_makeJoinMatcher_dimensionSelectorOnString() + { + final IndexedTableJoinable joinable = new IndexedTableJoinable(indexedTable); + final JoinConditionAnalysis condition = JoinConditionAnalysis.forExpression( + "x == \"j.str\"", + PREFIX, + ExprMacroTable.nil() + ); + final JoinMatcher joinMatcher = joinable.makeJoinMatcher(dummyColumnSelectorFactory, condition, false); + + final DimensionSelector selector = joinMatcher.getColumnSelectorFactory() + .makeDimensionSelector(DefaultDimensionSpec.of("str")); + + // getValueCardinality + Assert.assertEquals(3, selector.getValueCardinality()); + + // nameLookupPossibleInAdvance + Assert.assertTrue(selector.nameLookupPossibleInAdvance()); + + // lookupName + Assert.assertEquals("foo", selector.lookupName(0)); + Assert.assertEquals("bar", selector.lookupName(1)); + Assert.assertNull(selector.lookupName(2)); + + // lookupId + Assert.assertNull(selector.idLookup()); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/join/DefaultJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/DefaultJoinableFactory.java new file mode 100644 index 00000000000..c58752b9f70 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/join/DefaultJoinableFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import com.google.inject.Inject; +import org.apache.druid.query.DataSource; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class DefaultJoinableFactory implements JoinableFactory +{ + private final List factories; + + @Inject + public DefaultJoinableFactory(final InlineJoinableFactory inlineJoinableFactory) + { + // Just one right now, but we expect there to be more in the future, and maybe even an extension mechanism. + this.factories = Collections.singletonList(inlineJoinableFactory); + } + + @Override + public Optional build(final DataSource dataSource, final JoinConditionAnalysis condition) + { + for (JoinableFactory factory : factories) { + final Optional maybeJoinable = factory.build(dataSource, condition); + if (maybeJoinable.isPresent()) { + return maybeJoinable; + } + } + + return Optional.empty(); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java new file mode 100644 index 00000000000..d4b9937165b --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.join.table.IndexedTable; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.segment.join.table.RowBasedIndexedTable; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A {@link JoinableFactory} for {@link InlineDataSource}. It works by building an {@link IndexedTable}. + */ +public class InlineJoinableFactory implements JoinableFactory +{ + @Override + public Optional build(final DataSource dataSource, final JoinConditionAnalysis condition) + { + if (condition.canHashJoin() && dataSource instanceof InlineDataSource) { + final InlineDataSource inlineDataSource = (InlineDataSource) dataSource; + final List rightKeyColumns = + condition.getEquiConditions().stream().map(Equality::getRightColumn).distinct().collect(Collectors.toList()); + + return Optional.of( + new IndexedTableJoinable( + new RowBasedIndexedTable<>( + inlineDataSource.getRowsAsList(), + inlineDataSource.rowAdapter(), + inlineDataSource.getRowSignature(), + rightKeyColumns + ) + ) + ); + } else { + return Optional.empty(); + } + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index 63c456deb10..347bf287723 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -29,6 +29,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -52,6 +53,7 @@ public class Appenderators DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -75,6 +77,7 @@ public class Appenderators emitter, conglomerate, queryExecutorService, + joinableFactory, Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 830f4b8c99a..6bc438e21eb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -31,6 +31,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -74,6 +75,7 @@ public interface AppenderatorsManager DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 2c7e1144f52..969ed326e3a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -32,6 +32,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -46,6 +47,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final DataSegmentPusher dataSegmentPusher; private final ObjectMapper jsonMapper; private final IndexIO indexIO; @@ -59,6 +61,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, + @JacksonInject JoinableFactory joinableFactory, @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject @Json ObjectMapper jsonMapper, @JacksonInject IndexIO indexIO, @@ -72,6 +75,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; this.queryExecutorService = queryExecutorService; + this.joinableFactory = joinableFactory; this.dataSegmentPusher = dataSegmentPusher; this.jsonMapper = jsonMapper; this.indexIO = indexIO; @@ -107,6 +111,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory segmentAnnouncer, emitter, queryExecutorService, + joinableFactory, cache, cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index c1be5ff2e00..991e9e71e87 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -32,6 +32,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -65,6 +66,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index be860f92ee4..aaa104f447b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -32,6 +32,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -71,6 +72,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -94,6 +96,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager segmentAnnouncer, emitter, queryExecutorService, + joinableFactory, cache, cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index fb7b8067008..25597783167 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -57,6 +57,8 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; @@ -69,6 +71,7 @@ import java.io.Closeable; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; /** * Query handler for indexing tasks. @@ -85,6 +88,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final Cache cache; private final CacheConfig cacheConfig; private final CachePopulatorStats cachePopulatorStats; @@ -96,6 +100,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -107,6 +112,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService"); + this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory"); this.cache = Preconditions.checkNotNull(cache, "cache"); this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); @@ -149,11 +155,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker throw new ISE("Cannot handle datasource: %s", analysis.getDataSource()); } - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { throw new ISE("Unknown query type[%s].", query.getClass()); @@ -168,6 +169,13 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); } + // segmentMapFn maps each base Segment into a joined Segment if necessary. + final Function segmentMapFn = Joinables.createSegmentMapFn( + analysis.getPreJoinableClauses(), + joinableFactory, + cpuTimeAccumulator + ); + Iterable> perSegmentRunners = Iterables.transform( specs, descriptor -> { @@ -202,7 +210,9 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker // Prevent the underlying segment from swapping when its being iterated final Pair segmentAndCloseable = hydrant.getAndIncrementSegment(); try { - QueryRunner runner = factory.createRunner(segmentAndCloseable.lhs); + final Segment mappedSegment = segmentMapFn.apply(segmentAndCloseable.lhs); + + QueryRunner runner = factory.createRunner(mappedSegment); // 1) Only use caching if data is immutable // 2) Hydrants are not the same between replicas, make sure cache is local @@ -228,7 +238,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker runner, segmentAndCloseable.rhs ); - return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); + return new Pair<>(mappedSegment.getDataInterval(), runner); } catch (RuntimeException e) { CloseQuietly.close(segmentAndCloseable.rhs); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 96c5ff0ef26..e95e6a9ea08 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -51,6 +51,7 @@ import org.apache.druid.segment.ProgressIndicator; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.plumber.Sink; @@ -101,6 +102,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager private final Map datasourceBundles = new HashMap<>(); private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final WorkerConfig workerConfig; private final Cache cache; private final CacheConfig cacheConfig; @@ -114,6 +116,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager @Inject public UnifiedIndexerAppenderatorsManager( @Processing ExecutorService queryExecutorService, + JoinableFactory joinableFactory, WorkerConfig workerConfig, Cache cache, CacheConfig cacheConfig, @@ -124,6 +127,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager ) { this.queryExecutorService = queryExecutorService; + this.joinableFactory = joinableFactory; this.workerConfig = workerConfig; this.cache = cache; this.cacheConfig = cacheConfig; @@ -151,6 +155,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -320,6 +325,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager serviceEmitter, queryRunnerFactoryConglomerateProvider.get(), queryExecutorService, + joinableFactory, Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java index fca84bee638..20d4b75868a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.DateTime; @@ -70,6 +71,7 @@ public class FlushingPlumber extends RealtimePlumber QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, IndexMerger indexMerger, IndexIO indexIO, Cache cache, @@ -87,6 +89,7 @@ public class FlushingPlumber extends RealtimePlumber conglomerate, segmentAnnouncer, queryExecutorService, + joinableFactory, null, null, null, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java index 51249275064..da827bb1dc5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -34,6 +34,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Duration; @@ -54,6 +55,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; private final Cache cache; @@ -68,6 +70,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, + @JacksonInject JoinableFactory joinableFactory, @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @@ -84,6 +87,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool null, null, queryExecutorService, + joinableFactory, indexMergerV9, indexIO, cache, @@ -97,6 +101,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; this.queryExecutorService = queryExecutorService; + this.joinableFactory = joinableFactory; this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; @@ -123,6 +128,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool conglomerate, segmentAnnouncer, queryExecutorService, + joinableFactory, indexMergerV9, indexIO, cache, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index a6493cb3944..bd41e69c9d1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -62,6 +62,7 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfigs; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; @@ -138,6 +139,7 @@ public class RealtimePlumber implements Plumber QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ExecutorService queryExecutorService, + JoinableFactory joinableFactory, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, SegmentHandoffNotifier handoffNotifier, @@ -167,6 +169,7 @@ public class RealtimePlumber implements Plumber emitter, conglomerate, queryExecutorService, + joinableFactory, cache, cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 863fa58c273..f4d5c43f17a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.SegmentPublisher; @@ -41,6 +42,7 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import java.util.concurrent.ExecutorService; /** + * */ public class RealtimePlumberSchool implements PlumberSchool { @@ -51,6 +53,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final SegmentPublisher segmentPublisher; private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final ExecutorService queryExecutorService; + private final JoinableFactory joinableFactory; private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; private final Cache cache; @@ -67,6 +70,7 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject SegmentPublisher segmentPublisher, @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, @JacksonInject @Processing ExecutorService executorService, + @JacksonInject JoinableFactory joinableFactory, @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @@ -82,6 +86,7 @@ public class RealtimePlumberSchool implements PlumberSchool this.segmentPublisher = segmentPublisher; this.handoffNotifierFactory = handoffNotifierFactory; this.queryExecutorService = executorService; + this.joinableFactory = joinableFactory; this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); @@ -108,6 +113,7 @@ public class RealtimePlumberSchool implements PlumberSchool conglomerate, segmentAnnouncer, queryExecutorService, + joinableFactory, dataSegmentPusher, segmentPublisher, handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index d4c672c91f4..b781a7f0eec 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -52,7 +52,11 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; +import org.apache.druid.segment.ReferenceCounter; import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.Joinables; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; @@ -66,6 +70,7 @@ import java.util.Collections; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; /** * Query handler for Historical processes (see CliHistorical). @@ -81,6 +86,7 @@ public class ServerManager implements QuerySegmentWalker private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; private final SegmentManager segmentManager; + private final JoinableFactory joinableFactory; private final ServerConfig serverConfig; @Inject @@ -93,6 +99,7 @@ public class ServerManager implements QuerySegmentWalker Cache cache, CacheConfig cacheConfig, SegmentManager segmentManager, + JoinableFactory joinableFactory, ServerConfig serverConfig ) { @@ -106,6 +113,7 @@ public class ServerManager implements QuerySegmentWalker this.cacheConfig = cacheConfig; this.segmentManager = segmentManager; + this.joinableFactory = joinableFactory; this.serverConfig = serverConfig; } @@ -113,10 +121,6 @@ public class ServerManager implements QuerySegmentWalker public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - final VersionedIntervalTimeline timeline; final Optional> maybeTimeline = segmentManager.getTimeline(analysis); @@ -165,13 +169,8 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner<>(); } - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. - final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - final QueryToolChest> toolChest = factory.getToolchest(); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); final VersionedIntervalTimeline timeline; @@ -191,6 +190,13 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner<>(); } + // segmentMapFn maps each base Segment into a joined Segment if necessary. + final Function segmentMapFn = Joinables.createSegmentMapFn( + analysis.getPreJoinableClauses(), + joinableFactory, + cpuTimeAccumulator + ); + FunctionalIterable> queryRunners = FunctionalIterable .create(specs) .transformCat( @@ -214,7 +220,8 @@ public class ServerManager implements QuerySegmentWalker buildAndDecorateQueryRunner( factory, toolChest, - segment, + segmentMapFn.apply(segment), + segment.referenceCounter(), descriptor, cpuTimeAccumulator ) @@ -237,19 +244,20 @@ public class ServerManager implements QuerySegmentWalker private QueryRunner buildAndDecorateQueryRunner( final QueryRunnerFactory> factory, final QueryToolChest> toolChest, - final ReferenceCountingSegment adapter, + final Segment segment, + final ReferenceCounter segmentReferenceCounter, final SegmentDescriptor segmentDescriptor, final AtomicLong cpuTimeAccumulator ) { SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); - SegmentId segmentId = adapter.getId(); + SegmentId segmentId = segment.getId(); String segmentIdString = segmentId.toString(); MetricsEmittingQueryRunner metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>( emitter, toolChest, - new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor), + new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentReferenceCounter, segmentDescriptor), QueryMetrics::reportSegmentTime, queryMetrics -> queryMetrics.segment(segmentIdString) ); @@ -267,7 +275,7 @@ public class ServerManager implements QuerySegmentWalker BySegmentQueryRunner bySegmentQueryRunner = new BySegmentQueryRunner<>( segmentId, - adapter.getDataInterval().getStart(), + segment.getDataInterval().getStart(), cachingQueryRunner ); diff --git a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java new file mode 100644 index 00000000000..827f5470171 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + +public class InlineJoinableFactoryTest +{ + private static final String PREFIX = "j."; + + private final InlineJoinableFactory factory = new InlineJoinableFactory(); + + private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable( + ImmutableList.of("str", "long"), + ImmutableList.of(ValueType.STRING, ValueType.LONG), + ImmutableList.of( + new Object[]{"foo", 1L}, + new Object[]{"bar", 2L} + ) + ); + + @Test + public void testBuildNonInline() + { + Assert.assertEquals( + Optional.empty(), + factory.build(new TableDataSource("foo"), makeCondition("x == \"j.y\"")) + ); + } + + @Test + public void testBuildNonHashJoin() + { + Assert.assertEquals( + Optional.empty(), + factory.build(inlineDataSource, makeCondition("x > \"j.y\"")) + ); + } + + @Test + public void testBuild() + { + final Joinable joinable = factory.build(inlineDataSource, makeCondition("x == \"j.long\"")).get(); + + Assert.assertThat(joinable, CoreMatchers.instanceOf(IndexedTableJoinable.class)); + Assert.assertEquals(ImmutableList.of("long", "str"), joinable.getAvailableColumns()); + Assert.assertEquals(2, joinable.getCardinality("str")); + Assert.assertEquals(2, joinable.getCardinality("long")); + } + + private static JoinConditionAnalysis makeCondition(final String condition) + { + return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil()); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java index 2cd2c74f37c..7304f1fb676 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java @@ -37,6 +37,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.junit.Assert; @@ -112,6 +113,7 @@ public class FireDepartmentTest null, null, null, + NoopJoinableFactory.INSTANCE, TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance()), TestHelper.getTestIndexIO(), MapCache.create(0), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index 1584df56f2e..8c5ca835b35 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -57,6 +57,7 @@ import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; @@ -281,6 +282,7 @@ public class AppenderatorTester implements AutoCloseable }, emitter, queryExecutor, + NoopJoinableFactory.INSTANCE, MapCache.create(2048), new CacheConfig(), new CachePopulatorStats() diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 5b7de2adb89..97f537f382c 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -50,6 +50,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfigs; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireDepartmentTest; @@ -225,6 +226,7 @@ public class RealtimePlumberSchoolTest segmentPublisher, handoffNotifierFactory, Execs.directExecutor(), + NoopJoinableFactory.INSTANCE, TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory), TestHelper.getTestIndexIO(), MapCache.create(0), diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 24828e2bbaa..356302cee22 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; @@ -158,6 +159,7 @@ public class ServerManagerTest new LocalCacheProvider().get(), new CacheConfig(), segmentManager, + NoopJoinableFactory.INSTANCE, new ServerConfig() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index b700d828d90..d0574f9d4a8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -41,6 +41,8 @@ import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; +import org.apache.druid.segment.join.DefaultJoinableFactory; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.QueryResource; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.ServerManager; @@ -56,8 +58,6 @@ import org.eclipse.jetty.server.Server; import java.util.List; -/** - */ @Command( name = "historical", description = "Runs a Historical node, see https://druid.apache.org/docs/latest/Historical.html for a description" @@ -90,6 +90,7 @@ public class CliHistorical extends ServerRunnable binder.bind(SegmentManager.class).in(LazySingleton.class); binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class); + binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class); binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.HISTORICAL)); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index bf955a99864..1a8057deece 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -59,6 +59,8 @@ import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; +import org.apache.druid.segment.join.DefaultJoinableFactory; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; @@ -119,6 +121,7 @@ public class CliIndexer extends ServerRunnable binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class); binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class); + binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class); binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class); CliPeon.bindRowIngestionMeters(binder); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 3707b820cf0..d0e67be17a1 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -91,6 +91,8 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; +import org.apache.druid.segment.join.DefaultJoinableFactory; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentArchiver; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; @@ -207,6 +209,7 @@ public class CliPeon extends GuiceRunnable binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class); binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class); + binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class); binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class); bindRealtimeCache(binder);