Load only the required lookups for MSQ tasks (#16358)

With this PR changes, MSQ tasks (MSQControllerTask and MSQWorkerTask) only load the required lookups during querying and ingestion, based on the value of CTX_LOOKUPS_TO_LOAD key in the query context.
This commit is contained in:
Akshat Jain 2024-05-09 11:21:54 +05:30 committed by GitHub
parent a6ebb963c7
commit 775d654a6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 244 additions and 6 deletions

View File

@ -55,6 +55,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@ -269,6 +270,20 @@ public class IndexerControllerContext implements ControllerContext
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages());
// Put the lookup loading info in the task context to facilitate selective loading of lookups.
if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) {
taskContextOverridesBuilder.put(
PlannerContext.CTX_LOOKUP_LOADING_MODE,
controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE)
);
}
if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) {
taskContextOverridesBuilder.put(
PlannerContext.CTX_LOOKUPS_TO_LOAD,
controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD)
);
}
if (querySpec.getDestination().toSelectDestination() != null) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,

View File

@ -58,6 +58,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlResults;
@ -333,4 +334,10 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery,
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;
}
@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}
}

View File

@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -37,9 +38,13 @@ import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -185,4 +190,26 @@ public class MSQWorkerTask extends AbstractTask
{
return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker);
}
@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE);
if (lookupModeValue == null) {
return LookupLoadingSpec.ALL;
}
final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString());
if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) {
return LookupLoadingSpec.NONE;
} else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
Collection<String> lookupsToLoad = (Collection<String>) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
if (lookupsToLoad == null || lookupsToLoad.isEmpty()) {
throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad);
}
return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad));
} else {
return LookupLoadingSpec.ALL;
}
}
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
@ -283,6 +284,12 @@ public class MSQTaskQueryMaker implements QueryMaker
MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
final Map<String, Object> context = new HashMap<>();
context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode());
if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad());
}
final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec.withOverriddenContext(nativeQueryContext),
@ -291,7 +298,7 @@ public class MSQTaskQueryMaker implements QueryMaker
SqlResults.Context.fromPlannerContext(plannerContext),
sqlTypeNames,
columnTypeList,
null
context
);
FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true);

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
@ -70,6 +71,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
@ -227,7 +229,9 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{1L, "1"},
new Object[]{1L, "def"},
new Object[]{1L, "abc"}
)).verifyResults();
))
.setExpectedLookupLoadingSpec(LookupLoadingSpec.NONE)
.verifyResults();
}
@MethodSource("data")
@ -742,6 +746,7 @@ public class MSQSelectTest extends MSQTestBase
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{4L}))
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
.verifyResults();
}
@ -808,6 +813,7 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{"xabc", 1L}
)
)
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
.verifyResults();
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@ -34,12 +35,15 @@ import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -84,6 +88,55 @@ public class MSQControllerTaskTest
Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
}
@Test
public void testGetDefaultLookupLoadingSpec()
{
MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec());
}
@Test
public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
{
MSQSpec build = MSQSpec
.builder()
.query(new Druids.ScanQueryBuilder()
.intervals(new MultipleIntervalSegmentSpec(INTERVALS))
.dataSource("target")
.context(
ImmutableMap.of(
PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"),
PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED)
)
.build()
)
.columnMappings(new ColumnMappings(Collections.emptyList()))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build();
MSQControllerTask controllerTask = new MSQControllerTask(
null,
build,
null,
null,
null,
null,
null,
null
);
// Va;idate that MSQ Controller task doesn't load any lookups even if context has lookup info populated.
Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec());
}
@Test
public void testGetTaskAllocatorId()
{

View File

@ -20,9 +20,16 @@
package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
@ -47,7 +54,6 @@ public class MSQWorkerTaskTest
@Test
public void testEquals()
{
Assert.assertEquals(msqWorkerTask, msqWorkerTask);
Assert.assertEquals(
msqWorkerTask,
new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount)
@ -108,4 +114,61 @@ public class MSQWorkerTaskTest
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}
@Test
public void testGetDefaultLookupLoadingSpec()
{
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(LookupLoadingSpec.ALL, msqWorkerTask.getLookupLoadingSpec());
}
@Test
public void testGetLookupLoadingWithModeNoneInContext()
{
final ImmutableMap<String, Object> context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE);
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec());
}
@Test
public void testGetLookupLoadingSpecWithLookupListInContext()
{
final ImmutableMap<String, Object> context = ImmutableMap.of(
PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"),
PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED);
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode());
Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad());
}
@Test
public void testGetLookupLoadingSpecWithInvalidInput()
{
final HashMap<String, Object> context = new HashMap<>();
context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED);
// Setting CTX_LOOKUPS_TO_LOAD as null
context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null);
MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
DruidException exception = Assert.assertThrows(
DruidException.class,
taskWithNullLookups::getLookupLoadingSpec
);
Assert.assertEquals(
"Set of lookups to load cannot be null for mode[ONLY_REQUIRED].",
exception.getMessage());
// Setting CTX_LOOKUPS_TO_LOAD as empty list
context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList());
MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
exception = Assert.assertThrows(
DruidException.class,
taskWithEmptyLookups::getLookupLoadingSpec
);
Assert.assertEquals(
"Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].",
exception.getMessage());
}
}

View File

@ -153,6 +153,7 @@ import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.DirectStatement;
@ -853,6 +854,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected CompactionState expectedLastCompactionState = null;
protected Set<Interval> expectedTombstoneIntervals = null;
protected List<Object[]> expectedResultRows = null;
protected LookupLoadingSpec expectedLookupLoadingSpec = null;
protected Matcher<Throwable> expectedValidationErrorMatcher = null;
protected List<Pair<Predicate<MSQTaskReportPayload>, String>> adhocReportAssertionAndReasons = new ArrayList<>();
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
@ -917,6 +919,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
public Builder setExpectedLookupLoadingSpec(LookupLoadingSpec lookupLoadingSpec)
{
this.expectedLookupLoadingSpec = lookupLoadingSpec;
return asBuilder();
}
public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec)
{
this.expectedMSQSpec = expectedMSQSpec;
@ -1010,6 +1018,23 @@ public class MSQTestBase extends BaseCalciteQueryTest
assertThat(e, expectedValidationErrorMatcher);
}
protected void verifyLookupLoadingInfoInTaskContext(Map<String, Object> context)
{
String lookupLoadingMode = context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString();
List<String> lookupsToLoad = (List<String>) context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
if (expectedLookupLoadingSpec != null) {
Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), lookupLoadingMode);
if (expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED)) {
Assert.assertEquals(new ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad);
} else {
Assert.assertNull(lookupsToLoad);
}
} else {
Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), lookupLoadingMode);
Assert.assertNull(lookupsToLoad);
}
}
protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
{
Map<Integer, Map<Integer, CounterSnapshots>> counterMap = counterSnapshotsTree.copyMap();
@ -1165,7 +1190,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
verifyWorkerCount(reportPayload.getCounters());
verifyCounters(reportPayload.getCounters());
MSQSpec foundSpec = indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec();
MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId);
MSQSpec foundSpec = msqControllerTask.getQuerySpec();
verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext());
log.info(
"found generated segments: %s",
segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect(
@ -1393,6 +1420,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString());
} else {
MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId);
verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext());
final MSQSpec spec = msqControllerTask.getQuerySpec();
final List<Object[]> rows;

View File

@ -81,12 +81,16 @@ public class QueryLookupOperatorConversion implements SqlOperatorConversion
final DruidExpression arg = inputExpressions.get(0);
final Expr lookupNameExpr = plannerContext.parseExpression(inputExpressions.get(1).getExpression());
final String replaceMissingValueWith = getReplaceMissingValueWith(inputExpressions, plannerContext);
final String lookupName = (String) lookupNameExpr.getLiteralValue();
// Add the lookup name to the set of lookups to selectively load.
plannerContext.addLookupToLoad(lookupName);
if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) {
return arg.getSimpleExtraction().cascade(
new RegisteredLookupExtractionFn(
lookupExtractorFactoryContainerProvider,
(String) lookupNameExpr.getLiteralValue(),
lookupName,
false,
replaceMissingValueWith,
null,

View File

@ -43,6 +43,7 @@ import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ResourceAction;
@ -61,6 +62,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -78,6 +80,8 @@ public class PlannerContext
public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm";
public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode";
public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad";
private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST;
/**
@ -142,6 +146,7 @@ public class PlannerContext
// set of attributes for a SQL statement used in the EXPLAIN PLAN output
private ExplainAttributes explainAttributes;
private PlannerLookupCache lookupCache;
private final Set<String> lookupsToLoad = new HashSet<>();
private PlannerContext(
final PlannerToolbox plannerToolbox,
@ -343,6 +348,22 @@ public class PlannerContext
return plannerToolbox.rootSchema().getResourceType(schema, resourceName);
}
/**
* Adds the given lookup name to the lookup loading spec.
*/
public void addLookupToLoad(String lookupName)
{
lookupsToLoad.add(lookupName);
}
/**
* Returns the lookup to load for a given task.
*/
public LookupLoadingSpec getLookupLoadingSpec()
{
return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(lookupsToLoad);
}
/**
* Return the query context as a mutable map. Use this form when
* modifying the context during planning.

View File

@ -33,6 +33,7 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
import org.apache.druid.sql.calcite.schema.NamedLookupSchema;
import java.util.HashSet;
import java.util.List;
@ -87,13 +88,19 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
if (qualifiedNameParts.size() == 2) {
final String schema = qualifiedNameParts.get(0);
final String resourceName = qualifiedNameParts.get(1);
// Add the lookup name to the set of lookups to selectively load.
if (schema.equals(NamedLookupSchema.NAME)) {
plannerContext.addLookupToLoad(resourceName);
}
final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName);
if (resourceType != null) {
resourceActions.add(new ResourceAction(new Resource(resourceName, resourceType), Action.READ));
}
} else if (qualifiedNameParts.size() > 2) {
// Don't expect to see more than 2 names (catalog?).
throw new ISE("Cannot analyze table idetifier %s", qualifiedNameParts);
throw new ISE("Cannot analyze table identifier %s", qualifiedNameParts);
}
}
}