From 775d654a6c84fc6afbb1935010f11fed1bc35d18 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 11:21:54 +0530 Subject: [PATCH] Load only the required lookups for MSQ tasks (#16358) With this PR changes, MSQ tasks (MSQControllerTask and MSQWorkerTask) only load the required lookups during querying and ingestion, based on the value of CTX_LOOKUPS_TO_LOAD key in the query context. --- .../indexing/IndexerControllerContext.java | 15 +++++ .../druid/msq/indexing/MSQControllerTask.java | 7 ++ .../druid/msq/indexing/MSQWorkerTask.java | 27 ++++++++ .../druid/msq/sql/MSQTaskQueryMaker.java | 9 ++- .../apache/druid/msq/exec/MSQSelectTest.java | 8 ++- .../msq/indexing/MSQControllerTaskTest.java | 53 +++++++++++++++ .../druid/msq/indexing/MSQWorkerTaskTest.java | 65 ++++++++++++++++++- .../apache/druid/msq/test/MSQTestBase.java | 30 ++++++++- .../QueryLookupOperatorConversion.java | 6 +- .../sql/calcite/planner/PlannerContext.java | 21 ++++++ .../planner/SqlResourceCollectorShuttle.java | 9 ++- 11 files changed, 244 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 3ff71c3e1b7..e8fba09ddc5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -55,6 +55,7 @@ import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.DruidNode; +import org.apache.druid.sql.calcite.planner.PlannerContext; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -269,6 +270,20 @@ public class IndexerControllerContext implements ControllerContext .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); + // Put the lookup loading info in the task context to facilitate selective loading of lookups. + if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) { + taskContextOverridesBuilder.put( + PlannerContext.CTX_LOOKUP_LOADING_MODE, + controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) + ); + } + if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + taskContextOverridesBuilder.put( + PlannerContext.CTX_LOOKUPS_TO_LOAD, + controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) + ); + } + if (querySpec.getDestination().toSelectDestination() != null) { taskContextOverridesBuilder.put( MultiStageQueryContext.CTX_SELECT_DESTINATION, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 2a435215144..bdaf3964b29 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -58,6 +58,7 @@ import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.calcite.run.SqlResults; @@ -333,4 +334,10 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, { return querySpec.getDestination() instanceof DurableStorageMSQDestination; } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + return LookupLoadingSpec.NONE; + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index b4d18ea390e..a23c62881a0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -37,9 +38,13 @@ import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerImpl; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nonnull; +import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -185,4 +190,26 @@ public class MSQWorkerTask extends AbstractTask { return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker); } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE); + if (lookupModeValue == null) { + return LookupLoadingSpec.ALL; + } + + final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { + return LookupLoadingSpec.NONE; + } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { + Collection lookupsToLoad = (Collection) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { + throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); + } + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + } else { + return LookupLoadingSpec.ALL; + } + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 8cc34547f99..533010c3057 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -52,6 +52,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; @@ -283,6 +284,12 @@ public class MSQTaskQueryMaker implements QueryMaker MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); + final Map context = new HashMap<>(); + context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); + if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) { + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); + } + final MSQControllerTask controllerTask = new MSQControllerTask( taskId, querySpec.withOverriddenContext(nativeQueryContext), @@ -291,7 +298,7 @@ public class MSQTaskQueryMaker implements QueryMaker SqlResults.Context.fromPlannerContext(plannerContext), sqlTypeNames, columnTypeList, - null + context ); FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 56f1ce98696..84dddd526c1 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -21,6 +21,7 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -70,6 +71,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.filtration.Filtration; @@ -227,7 +229,9 @@ public class MSQSelectTest extends MSQTestBase new Object[]{1L, "1"}, new Object[]{1L, "def"}, new Object[]{1L, "abc"} - )).verifyResults(); + )) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.NONE) + .verifyResults(); } @MethodSource("data") @@ -742,6 +746,7 @@ public class MSQSelectTest extends MSQTestBase .build()) .setExpectedRowSignature(rowSignature) .setExpectedResultRows(ImmutableList.of(new Object[]{4L})) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo"))) .verifyResults(); } @@ -808,6 +813,7 @@ public class MSQSelectTest extends MSQTestBase new Object[]{"xabc", 1L} ) ) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo"))) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index e0eee251f72..a5001fb58eb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -34,12 +35,15 @@ import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -84,6 +88,55 @@ public class MSQControllerTaskTest Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty()); } + @Test + public void testGetDefaultLookupLoadingSpec() + { + MSQControllerTask controllerTask = new MSQControllerTask( + null, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() + { + MSQSpec build = MSQSpec + .builder() + .query(new Druids.ScanQueryBuilder() + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .context( + ImmutableMap.of( + PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) + ) + .build() + ) + .columnMappings(new ColumnMappings(Collections.emptyList())) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .build(); + MSQControllerTask controllerTask = new MSQControllerTask( + null, + build, + null, + null, + null, + null, + null, + null + ); + + // Va;idate that MSQ Controller task doesn't load any lookups even if context has lookup info populated. + Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); + } + @Test public void testGetTaskAllocatorId() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 6eff77184ea..5e79b129f3b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -20,9 +20,16 @@ package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -47,7 +54,6 @@ public class MSQWorkerTaskTest @Test public void testEquals() { - Assert.assertEquals(msqWorkerTask, msqWorkerTask); Assert.assertEquals( msqWorkerTask, new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount) @@ -108,4 +114,61 @@ public class MSQWorkerTaskTest MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } + + @Test + public void testGetDefaultLookupLoadingSpec() + { + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(LookupLoadingSpec.ALL, msqWorkerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingWithModeNoneInContext() + { + final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingSpecWithLookupListInContext() + { + final ImmutableMap context = ImmutableMap.of( + PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); + Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); + } + + @Test + public void testGetLookupLoadingSpecWithInvalidInput() + { + final HashMap context = new HashMap<>(); + context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + + // Setting CTX_LOOKUPS_TO_LOAD as null + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + + MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + DruidException exception = Assert.assertThrows( + DruidException.class, + taskWithNullLookups::getLookupLoadingSpec + ); + Assert.assertEquals( + "Set of lookups to load cannot be null for mode[ONLY_REQUIRED].", + exception.getMessage()); + + // Setting CTX_LOOKUPS_TO_LOAD as empty list + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); + + MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + exception = Assert.assertThrows( + DruidException.class, + taskWithEmptyLookups::getLookupLoadingSpec + ); + Assert.assertEquals( + "Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].", + exception.getMessage()); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index fe78b481bee..cdaafc75c60 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -153,6 +153,7 @@ import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.sql.DirectStatement; @@ -853,6 +854,7 @@ public class MSQTestBase extends BaseCalciteQueryTest protected CompactionState expectedLastCompactionState = null; protected Set expectedTombstoneIntervals = null; protected List expectedResultRows = null; + protected LookupLoadingSpec expectedLookupLoadingSpec = null; protected Matcher expectedValidationErrorMatcher = null; protected List, String>> adhocReportAssertionAndReasons = new ArrayList<>(); protected Matcher expectedExecutionErrorMatcher = null; @@ -917,6 +919,12 @@ public class MSQTestBase extends BaseCalciteQueryTest return asBuilder(); } + public Builder setExpectedLookupLoadingSpec(LookupLoadingSpec lookupLoadingSpec) + { + this.expectedLookupLoadingSpec = lookupLoadingSpec; + return asBuilder(); + } + public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec) { this.expectedMSQSpec = expectedMSQSpec; @@ -1010,6 +1018,23 @@ public class MSQTestBase extends BaseCalciteQueryTest assertThat(e, expectedValidationErrorMatcher); } + protected void verifyLookupLoadingInfoInTaskContext(Map context) + { + String lookupLoadingMode = context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); + List lookupsToLoad = (List) context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + if (expectedLookupLoadingSpec != null) { + Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), lookupLoadingMode); + if (expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED)) { + Assert.assertEquals(new ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad); + } else { + Assert.assertNull(lookupsToLoad); + } + } else { + Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), lookupLoadingMode); + Assert.assertNull(lookupsToLoad); + } + } + protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree) { Map> counterMap = counterSnapshotsTree.copyMap(); @@ -1165,7 +1190,9 @@ public class MSQTestBase extends BaseCalciteQueryTest verifyWorkerCount(reportPayload.getCounters()); verifyCounters(reportPayload.getCounters()); - MSQSpec foundSpec = indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec(); + MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId); + MSQSpec foundSpec = msqControllerTask.getQuerySpec(); + verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); log.info( "found generated segments: %s", segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect( @@ -1393,6 +1420,7 @@ public class MSQTestBase extends BaseCalciteQueryTest throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString()); } else { MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId); + verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); final MSQSpec spec = msqControllerTask.getQuerySpec(); final List rows; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index 9075456c812..8947bd60a01 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -81,12 +81,16 @@ public class QueryLookupOperatorConversion implements SqlOperatorConversion final DruidExpression arg = inputExpressions.get(0); final Expr lookupNameExpr = plannerContext.parseExpression(inputExpressions.get(1).getExpression()); final String replaceMissingValueWith = getReplaceMissingValueWith(inputExpressions, plannerContext); + final String lookupName = (String) lookupNameExpr.getLiteralValue(); + + // Add the lookup name to the set of lookups to selectively load. + plannerContext.addLookupToLoad(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { return arg.getSimpleExtraction().cascade( new RegisteredLookupExtractionFn( lookupExtractorFactoryContainerProvider, - (String) lookupNameExpr.getLiteralValue(), + lookupName, false, replaceMissingValueWith, null, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 281fc66c8aa..99f721bffaa 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -43,6 +43,7 @@ import org.apache.druid.query.lookup.LookupExtractor; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.RegisteredLookupExtractionFn; import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.ResourceAction; @@ -61,6 +62,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -78,6 +80,8 @@ public class PlannerContext public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp"; public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone"; public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm"; + public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; + public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST; /** @@ -142,6 +146,7 @@ public class PlannerContext // set of attributes for a SQL statement used in the EXPLAIN PLAN output private ExplainAttributes explainAttributes; private PlannerLookupCache lookupCache; + private final Set lookupsToLoad = new HashSet<>(); private PlannerContext( final PlannerToolbox plannerToolbox, @@ -343,6 +348,22 @@ public class PlannerContext return plannerToolbox.rootSchema().getResourceType(schema, resourceName); } + /** + * Adds the given lookup name to the lookup loading spec. + */ + public void addLookupToLoad(String lookupName) + { + lookupsToLoad.add(lookupName); + } + + /** + * Returns the lookup to load for a given task. + */ + public LookupLoadingSpec getLookupLoadingSpec() + { + return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(lookupsToLoad); + } + /** * Return the query context as a mutable map. Use this form when * modifying the context during planning. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index 631936972e1..4300c7d574b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -33,6 +33,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.expression.AuthorizableOperator; +import org.apache.druid.sql.calcite.schema.NamedLookupSchema; import java.util.HashSet; import java.util.List; @@ -87,13 +88,19 @@ public class SqlResourceCollectorShuttle extends SqlShuttle if (qualifiedNameParts.size() == 2) { final String schema = qualifiedNameParts.get(0); final String resourceName = qualifiedNameParts.get(1); + + // Add the lookup name to the set of lookups to selectively load. + if (schema.equals(NamedLookupSchema.NAME)) { + plannerContext.addLookupToLoad(resourceName); + } + final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName); if (resourceType != null) { resourceActions.add(new ResourceAction(new Resource(resourceName, resourceType), Action.READ)); } } else if (qualifiedNameParts.size() > 2) { // Don't expect to see more than 2 names (catalog?). - throw new ISE("Cannot analyze table idetifier %s", qualifiedNameParts); + throw new ISE("Cannot analyze table identifier %s", qualifiedNameParts); } } }