From 1930ad1f47126210524a1a9b0bd56627355b51e0 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Wed, 6 Oct 2021 11:02:41 -0500 Subject: [PATCH] Implement configurable internally generated query context (#11429) * Add the ability to add a context to internally generated druid broker queries * fix docs * changes after first CI failure * cleanup after merge with master * change default to empty map and improve unit tests * add doc info and fix checkstyle * refactor DruidSchema#runSegmentMetadataQuery and add a unit test --- docs/configuration/index.md | 14 ++ .../client/BrokerInternalQueryConfig.java | 41 ++++++ .../client/BrokerInternalQueryConfigTest.java | 121 +++++++++++++++++ .../java/org/apache/druid/cli/CliBroker.java | 2 + .../druid/sql/calcite/schema/DruidSchema.java | 32 +++-- .../schema/DruidSchemaConcurrencyTest.java | 7 +- .../schema/DruidSchemaNoDataInitTest.java | 4 +- .../sql/calcite/schema/DruidSchemaTest.java | 122 +++++++++++++++--- .../sql/calcite/schema/SystemSchemaTest.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 4 +- 10 files changed, 320 insertions(+), 31 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java create mode 100644 server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 001bb404ca7..1d20029886b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1781,6 +1781,20 @@ line. See [general query configuration](#general-query-configuration). +###### Broker Generated Query Configuration Supplementation + +The Broker generates queries internally. This configuration section describes how an operator can augment the configuration +of these queries. + +As of now the only supported augmentation is overriding the default query context. This allows an operator the flexibility +to adjust it as they see fit. A common use of this configuration is to override the query priority of the cluster generated +queries in order to avoid running as a default priority of 0. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.broker.internal.query.config.context`|A string formatted `key:value` map of a query context to add to internally generated broker queries.|null| + + #### SQL The Druid SQL server is configured through the following properties on the Broker. diff --git a/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java b/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java new file mode 100644 index 00000000000..9b893778d60 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class contains configuration that internally generated Druid queries + * should add to their query payload. The runtime properties for this class + * have the prefix "druid.broker.internal.query.config." + */ +public class BrokerInternalQueryConfig +{ + @JsonProperty + private Map context = new HashMap<>(); + + public Map getContext() + { + return context; + } +} diff --git a/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java new file mode 100644 index 00000000000..24b61bb1ac4 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.core.io.JsonEOFException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import org.apache.druid.guice.ConfigModule; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class BrokerInternalQueryConfigTest +{ + private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); + + @Test + public void testSerde() throws Exception + { + //defaults + String json = "{}"; + + BrokerInternalQueryConfig config = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(json, BrokerInternalQueryConfig.class) + ), + BrokerInternalQueryConfig.class + ); + + Assert.assertEquals(ImmutableMap.of(), config.getContext()); + + //non-defaults + json = "{ \"context\": {\"priority\": 5}}"; + + config = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(json, BrokerInternalQueryConfig.class) + ), + BrokerInternalQueryConfig.class + ); + + Map expected = new HashMap<>(); + expected.put("priority", 5); + Assert.assertEquals(expected, config.getContext()); + } + + /** + * Malformatted configuration will trigger an exception and fail to startup the service + * + * @throws Exception + */ + @Test(expected = JsonEOFException.class) + public void testMalfomattedContext() throws Exception + { + String malformedJson = "{\"priority: 5}"; + MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(malformedJson, BrokerInternalQueryConfig.class) + ), + BrokerInternalQueryConfig.class + ); + } + + /** + * Test the behavior if the operator does not specify anything for druid.broker.internal.query.config.context in runtime.properties + */ + @Test + public void testDefaultBehavior() + { + Injector injector = Guice.createInjector( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.install(new ConfigModule()); + binder.install(new DruidGuiceExtensions()); + JsonConfigProvider.bind(binder, "druid.broker.internal.query.config", BrokerInternalQueryConfig.class); + } + + @Provides + @LazySingleton + public ObjectMapper jsonMapper() + { + return new DefaultObjectMapper(); + } + } + ); + BrokerInternalQueryConfig config = injector.getInstance(BrokerInternalQueryConfig.class); + Assert.assertEquals(ImmutableMap.of(), config.getContext()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 5ed03aa52b5..cb279059ceb 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -25,6 +25,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerServerView; import org.apache.druid.client.CachingClusteredClient; @@ -126,6 +127,7 @@ public class CliBroker extends ServerRunnable JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class); JsonConfigProvider.bind(binder, "druid.broker.retryPolicy", RetryQueryRunnerConfig.class); JsonConfigProvider.bind(binder, "druid.broker.segment", BrokerSegmentWatcherConfig.class); + JsonConfigProvider.bind(binder, "druid.broker.internal.query.config", BrokerInternalQueryConfig.class); binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index e5846a9f05a..70e154fa2e4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.ServerView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.guice.ManageLifecycle; @@ -60,7 +61,6 @@ import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.table.DruidTable; @@ -185,6 +185,9 @@ public class DruidSchema extends AbstractSchema @GuardedBy("lock") private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); + // Configured context to attach to internally generated queries. + private final BrokerInternalQueryConfig brokerInternalQueryConfig; + @GuardedBy("lock") private boolean refreshImmediately = false; @@ -205,7 +208,8 @@ public class DruidSchema extends AbstractSchema final SegmentManager segmentManager, final JoinableFactory joinableFactory, final PlannerConfig config, - final Escalator escalator + final Escalator escalator, + final BrokerInternalQueryConfig brokerInternalQueryConfig ) { this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); @@ -216,6 +220,7 @@ public class DruidSchema extends AbstractSchema this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); this.escalator = escalator; + this.brokerInternalQueryConfig = brokerInternalQueryConfig; serverView.registerTimelineCallback( callbackExec, @@ -674,9 +679,7 @@ public class DruidSchema extends AbstractSchema final Set retVal = new HashSet<>(); final Sequence sequence = runSegmentMetadataQuery( - queryLifecycleFactory, - Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY), - escalator.createEscalatedAuthenticationResult() + Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) ); Yielder yielder = Yielders.each(sequence); @@ -835,10 +838,15 @@ public class DruidSchema extends AbstractSchema } } - private static Sequence runSegmentMetadataQuery( - final QueryLifecycleFactory queryLifecycleFactory, - final Iterable segments, - final AuthenticationResult authenticationResult + /** + * Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}. + * + * @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query. + * @return {@link Sequence} of {@link SegmentAnalysis} objects + */ + @VisibleForTesting + Sequence runSegmentMetadataQuery( + final Iterable segments ) { // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. @@ -857,13 +865,15 @@ public class DruidSchema extends AbstractSchema querySegmentSpec, new AllColumnIncluderator(), false, - ImmutableMap.of(), + brokerInternalQueryConfig.getContext(), EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), false, false ); - return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, authenticationResult, Access.OK); + return queryLifecycleFactory + .factorize() + .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK); } private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java index 129bf9d51b7..6f0d0b42c17 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerServerView; import org.apache.druid.client.DruidServer; @@ -135,7 +136,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -238,7 +240,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index 96a464e54d7..b2c1981c1b4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.join.MapJoinableFactory; @@ -57,7 +58,8 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase new SegmentManager(EasyMock.createMock(SegmentLoader.class)), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ); druidSchema.start(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index e2a75544053..2bfd3ca0038 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -28,6 +29,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -37,13 +39,21 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; +import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.QueryLifecycle; +import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.util.CalciteTests; @@ -54,6 +64,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -61,6 +72,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +89,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon private DruidSchema schema2 = null; private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1); + private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); @Before public void setUp() throws Exception @@ -158,9 +171,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, - new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)), + new MapJoinableFactory( + ImmutableSet.of(globalTableJoinable), + ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class) + ), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -185,11 +202,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { boolean throwException = true; + @Override protected DruidTable buildDruidTable(String dataSource) { @@ -414,9 +433,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertEquals(1L, metadata.isRealtime()); // get the historical server final ImmutableDruidServer historicalServer = druidServers.stream() - .filter(s -> s.getType().equals(ServerType.HISTORICAL)) - .findAny() - .orElse(null); + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); Assert.assertNotNull(historicalServer); final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); @@ -461,7 +480,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -502,7 +522,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -547,7 +568,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -589,7 +611,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -628,7 +651,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -684,7 +708,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -743,7 +768,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -776,7 +802,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -822,7 +849,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ) { @Override @@ -995,6 +1023,70 @@ public class DruidSchemaTest extends DruidSchemaTestCommon Assert.assertFalse(fooTable.isJoinable()); } + /** + * Ensure that the BrokerInternalQueryConfig context is honored for this internally generated SegmentMetadata Query + */ + @Test + public void testRunSegmentMetadataQueryWithContext() throws Exception + { + Map queryContext = ImmutableMap.of("priority", 5); + + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + BrokerInternalQueryConfig brokerInternalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, BrokerInternalQueryConfig.class) + ), + BrokerInternalQueryConfig.class + ); + + DataSegment segment = newSegment("test", 0); + List segmentIterable = ImmutableList.of(segment.getId()); + + // This is the query that we expect this method to create. We will be testing that it matches the query generated by the method under test. + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .map(SegmentId::toDescriptor).collect(Collectors.toList())), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + false + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); + + // Need to create schema for this test because the available schemas don't mock the QueryLifecycleFactory, which I need for this test. + DruidSchema mySchema = new DruidSchema( + factoryMock, + serverView, + segmentManager, + new MapJoinableFactory( + ImmutableSet.of(globalTableJoinable), + ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class) + ), + PLANNER_CONFIG_DEFAULT, + new NoopEscalator(), + brokerInternalQueryConfig + ); + + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)).andReturn(null); + + EasyMock.replay(factoryMock, lifecycleMock); + + mySchema.runSegmentMetadataQuery(segmentIterable); + + EasyMock.verify(factoryMock, lifecycleMock); + + } + private static DataSegment newSegment(String datasource, int partitionId) { return new DataSegment( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index ed6599aa6a1..d25a9026f15 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; @@ -255,7 +256,8 @@ public class SystemSchemaTest extends CalciteTestBase new SegmentManager(EasyMock.createMock(SegmentLoader.class)), new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, - new NoopEscalator() + new NoopEscalator(), + new BrokerInternalQueryConfig() ); druidSchema.start(); druidSchema.awaitInitialization(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index ca70e1dfe4a..79fc67310cc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -31,6 +31,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.schema.SchemaPlus; +import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ServerInventoryView; @@ -1243,7 +1244,8 @@ public class CalciteTests }, createDefaultJoinableFactory(), plannerConfig, - TEST_AUTHENTICATOR_ESCALATOR + TEST_AUTHENTICATOR_ESCALATOR, + new BrokerInternalQueryConfig() ); try {