mirror of https://github.com/apache/druid.git
Implement configurable internally generated query context (#11429)
* Add the ability to add a context to internally generated druid broker queries * fix docs * changes after first CI failure * cleanup after merge with master * change default to empty map and improve unit tests * add doc info and fix checkstyle * refactor DruidSchema#runSegmentMetadataQuery and add a unit test
This commit is contained in:
parent
b688db790b
commit
1930ad1f47
|
@ -1781,6 +1781,20 @@ line.
|
|||
|
||||
See [general query configuration](#general-query-configuration).
|
||||
|
||||
###### Broker Generated Query Configuration Supplementation
|
||||
|
||||
The Broker generates queries internally. This configuration section describes how an operator can augment the configuration
|
||||
of these queries.
|
||||
|
||||
As of now the only supported augmentation is overriding the default query context. This allows an operator the flexibility
|
||||
to adjust it as they see fit. A common use of this configuration is to override the query priority of the cluster generated
|
||||
queries in order to avoid running as a default priority of 0.
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.broker.internal.query.config.context`|A string formatted `key:value` map of a query context to add to internally generated broker queries.|null|
|
||||
|
||||
|
||||
#### SQL
|
||||
|
||||
The Druid SQL server is configured through the following properties on the Broker.
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class contains configuration that internally generated Druid queries
|
||||
* should add to their query payload. The runtime properties for this class
|
||||
* have the prefix "druid.broker.internal.query.config."
|
||||
*/
|
||||
public class BrokerInternalQueryConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private Map<String, Object> context = new HashMap<>();
|
||||
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
return context;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.core.io.JsonEOFException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import org.apache.druid.guice.ConfigModule;
|
||||
import org.apache.druid.guice.DruidGuiceExtensions;
|
||||
import org.apache.druid.guice.JsonConfigProvider;
|
||||
import org.apache.druid.guice.LazySingleton;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class BrokerInternalQueryConfigTest
|
||||
{
|
||||
private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
//defaults
|
||||
String json = "{}";
|
||||
|
||||
BrokerInternalQueryConfig config = MAPPER.readValue(
|
||||
MAPPER.writeValueAsString(
|
||||
MAPPER.readValue(json, BrokerInternalQueryConfig.class)
|
||||
),
|
||||
BrokerInternalQueryConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(ImmutableMap.of(), config.getContext());
|
||||
|
||||
//non-defaults
|
||||
json = "{ \"context\": {\"priority\": 5}}";
|
||||
|
||||
config = MAPPER.readValue(
|
||||
MAPPER.writeValueAsString(
|
||||
MAPPER.readValue(json, BrokerInternalQueryConfig.class)
|
||||
),
|
||||
BrokerInternalQueryConfig.class
|
||||
);
|
||||
|
||||
Map<String, Object> expected = new HashMap<>();
|
||||
expected.put("priority", 5);
|
||||
Assert.assertEquals(expected, config.getContext());
|
||||
}
|
||||
|
||||
/**
|
||||
* Malformatted configuration will trigger an exception and fail to startup the service
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(expected = JsonEOFException.class)
|
||||
public void testMalfomattedContext() throws Exception
|
||||
{
|
||||
String malformedJson = "{\"priority: 5}";
|
||||
MAPPER.readValue(
|
||||
MAPPER.writeValueAsString(
|
||||
MAPPER.readValue(malformedJson, BrokerInternalQueryConfig.class)
|
||||
),
|
||||
BrokerInternalQueryConfig.class
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the behavior if the operator does not specify anything for druid.broker.internal.query.config.context in runtime.properties
|
||||
*/
|
||||
@Test
|
||||
public void testDefaultBehavior()
|
||||
{
|
||||
Injector injector = Guice.createInjector(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.install(new ConfigModule());
|
||||
binder.install(new DruidGuiceExtensions());
|
||||
JsonConfigProvider.bind(binder, "druid.broker.internal.query.config", BrokerInternalQueryConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public ObjectMapper jsonMapper()
|
||||
{
|
||||
return new DefaultObjectMapper();
|
||||
}
|
||||
}
|
||||
);
|
||||
BrokerInternalQueryConfig config = injector.getInstance(BrokerInternalQueryConfig.class);
|
||||
Assert.assertEquals(ImmutableMap.of(), config.getContext());
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import com.google.inject.Key;
|
|||
import com.google.inject.Module;
|
||||
import com.google.inject.name.Names;
|
||||
import io.airlift.airline.Command;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.client.BrokerSegmentWatcherConfig;
|
||||
import org.apache.druid.client.BrokerServerView;
|
||||
import org.apache.druid.client.CachingClusteredClient;
|
||||
|
@ -126,6 +127,7 @@ public class CliBroker extends ServerRunnable
|
|||
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.retryPolicy", RetryQueryRunnerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.segment", BrokerSegmentWatcherConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.internal.query.config", BrokerInternalQueryConfig.class);
|
||||
|
||||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
|
|||
import com.google.inject.Inject;
|
||||
import org.apache.calcite.schema.Table;
|
||||
import org.apache.calcite.schema.impl.AbstractSchema;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.client.ServerView;
|
||||
import org.apache.druid.client.TimelineServerView;
|
||||
import org.apache.druid.guice.ManageLifecycle;
|
||||
|
@ -60,7 +61,6 @@ import org.apache.druid.server.SegmentManager;
|
|||
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||
import org.apache.druid.server.coordination.ServerType;
|
||||
import org.apache.druid.server.security.Access;
|
||||
import org.apache.druid.server.security.AuthenticationResult;
|
||||
import org.apache.druid.server.security.Escalator;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerConfig;
|
||||
import org.apache.druid.sql.calcite.table.DruidTable;
|
||||
|
@ -185,6 +185,9 @@ public class DruidSchema extends AbstractSchema
|
|||
@GuardedBy("lock")
|
||||
private final TreeSet<SegmentId> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
|
||||
|
||||
// Configured context to attach to internally generated queries.
|
||||
private final BrokerInternalQueryConfig brokerInternalQueryConfig;
|
||||
|
||||
@GuardedBy("lock")
|
||||
private boolean refreshImmediately = false;
|
||||
|
||||
|
@ -205,7 +208,8 @@ public class DruidSchema extends AbstractSchema
|
|||
final SegmentManager segmentManager,
|
||||
final JoinableFactory joinableFactory,
|
||||
final PlannerConfig config,
|
||||
final Escalator escalator
|
||||
final Escalator escalator,
|
||||
final BrokerInternalQueryConfig brokerInternalQueryConfig
|
||||
)
|
||||
{
|
||||
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
|
||||
|
@ -216,6 +220,7 @@ public class DruidSchema extends AbstractSchema
|
|||
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
|
||||
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
|
||||
this.escalator = escalator;
|
||||
this.brokerInternalQueryConfig = brokerInternalQueryConfig;
|
||||
|
||||
serverView.registerTimelineCallback(
|
||||
callbackExec,
|
||||
|
@ -674,9 +679,7 @@ public class DruidSchema extends AbstractSchema
|
|||
|
||||
final Set<SegmentId> retVal = new HashSet<>();
|
||||
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
|
||||
queryLifecycleFactory,
|
||||
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY),
|
||||
escalator.createEscalatedAuthenticationResult()
|
||||
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
|
||||
);
|
||||
|
||||
Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
|
||||
|
@ -835,10 +838,15 @@ public class DruidSchema extends AbstractSchema
|
|||
}
|
||||
}
|
||||
|
||||
private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
|
||||
final QueryLifecycleFactory queryLifecycleFactory,
|
||||
final Iterable<SegmentId> segments,
|
||||
final AuthenticationResult authenticationResult
|
||||
/**
|
||||
* Execute a SegmentMetadata query and return a {@link Sequence} of {@link SegmentAnalysis}.
|
||||
*
|
||||
* @param segments Iterable of {@link SegmentId} objects that are subject of the SegmentMetadata query.
|
||||
* @return {@link Sequence} of {@link SegmentAnalysis} objects
|
||||
*/
|
||||
@VisibleForTesting
|
||||
Sequence<SegmentAnalysis> runSegmentMetadataQuery(
|
||||
final Iterable<SegmentId> segments
|
||||
)
|
||||
{
|
||||
// Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource.
|
||||
|
@ -857,13 +865,15 @@ public class DruidSchema extends AbstractSchema
|
|||
querySegmentSpec,
|
||||
new AllColumnIncluderator(),
|
||||
false,
|
||||
ImmutableMap.of(),
|
||||
brokerInternalQueryConfig.getContext(),
|
||||
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
|
||||
false,
|
||||
false
|
||||
);
|
||||
|
||||
return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, authenticationResult, Access.OK);
|
||||
return queryLifecycleFactory
|
||||
.factorize()
|
||||
.runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK);
|
||||
}
|
||||
|
||||
private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.client.BrokerSegmentWatcherConfig;
|
||||
import org.apache.druid.client.BrokerServerView;
|
||||
import org.apache.druid.client.DruidServer;
|
||||
|
@ -135,7 +136,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -238,7 +240,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
|
@ -57,7 +58,8 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
|
|||
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
);
|
||||
|
||||
druidSchema.start();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.sql.calcite.schema;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.calcite.rel.type.RelDataType;
|
|||
import org.apache.calcite.rel.type.RelDataTypeField;
|
||||
import org.apache.calcite.schema.Table;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.client.ImmutableDruidServer;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
|
@ -37,13 +39,21 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
|||
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
|
||||
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
|
||||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.QueryLifecycle;
|
||||
import org.apache.druid.server.QueryLifecycleFactory;
|
||||
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||
import org.apache.druid.server.coordination.ServerType;
|
||||
import org.apache.druid.server.security.Access;
|
||||
import org.apache.druid.server.security.AllowAllAuthenticator;
|
||||
import org.apache.druid.server.security.NoopEscalator;
|
||||
import org.apache.druid.sql.calcite.table.DruidTable;
|
||||
import org.apache.druid.sql.calcite.util.CalciteTests;
|
||||
|
@ -54,6 +64,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
|
|||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.apache.druid.timeline.partition.LinearShardSpec;
|
||||
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -61,6 +72,7 @@ import org.junit.Test;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -77,6 +89,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
private DruidSchema schema2 = null;
|
||||
private CountDownLatch buildTableLatch = new CountDownLatch(1);
|
||||
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
|
||||
private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
|
@ -158,9 +171,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
|
||||
serverView,
|
||||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
|
||||
new MapJoinableFactory(
|
||||
ImmutableSet.of(globalTableJoinable),
|
||||
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)
|
||||
),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -185,11 +202,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
|
||||
boolean throwException = true;
|
||||
|
||||
@Override
|
||||
protected DruidTable buildDruidTable(String dataSource)
|
||||
{
|
||||
|
@ -414,9 +433,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
Assert.assertEquals(1L, metadata.isRealtime());
|
||||
// get the historical server
|
||||
final ImmutableDruidServer historicalServer = druidServers.stream()
|
||||
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
|
||||
.findAny()
|
||||
.orElse(null);
|
||||
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
|
||||
.findAny()
|
||||
.orElse(null);
|
||||
|
||||
Assert.assertNotNull(historicalServer);
|
||||
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
|
||||
|
@ -461,7 +480,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -502,7 +522,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -547,7 +568,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -589,7 +611,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -628,7 +651,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -684,7 +708,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -743,7 +768,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -776,7 +802,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -822,7 +849,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
@ -995,6 +1023,70 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
|
|||
Assert.assertFalse(fooTable.isJoinable());
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that the BrokerInternalQueryConfig context is honored for this internally generated SegmentMetadata Query
|
||||
*/
|
||||
@Test
|
||||
public void testRunSegmentMetadataQueryWithContext() throws Exception
|
||||
{
|
||||
Map<String, Object> queryContext = ImmutableMap.of("priority", 5);
|
||||
|
||||
String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";
|
||||
|
||||
TestHelper.makeJsonMapper();
|
||||
BrokerInternalQueryConfig brokerInternalQueryConfig = MAPPER.readValue(
|
||||
MAPPER.writeValueAsString(
|
||||
MAPPER.readValue(brokerInternalQueryConfigJson, BrokerInternalQueryConfig.class)
|
||||
),
|
||||
BrokerInternalQueryConfig.class
|
||||
);
|
||||
|
||||
DataSegment segment = newSegment("test", 0);
|
||||
List<SegmentId> segmentIterable = ImmutableList.of(segment.getId());
|
||||
|
||||
// This is the query that we expect this method to create. We will be testing that it matches the query generated by the method under test.
|
||||
SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
|
||||
new TableDataSource(segment.getDataSource()),
|
||||
new MultipleSpecificSegmentSpec(
|
||||
segmentIterable.stream()
|
||||
.map(SegmentId::toDescriptor).collect(Collectors.toList())),
|
||||
new AllColumnIncluderator(),
|
||||
false,
|
||||
queryContext,
|
||||
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
|
||||
false,
|
||||
false
|
||||
);
|
||||
|
||||
QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
|
||||
QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
|
||||
|
||||
// Need to create schema for this test because the available schemas don't mock the QueryLifecycleFactory, which I need for this test.
|
||||
DruidSchema mySchema = new DruidSchema(
|
||||
factoryMock,
|
||||
serverView,
|
||||
segmentManager,
|
||||
new MapJoinableFactory(
|
||||
ImmutableSet.of(globalTableJoinable),
|
||||
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)
|
||||
),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator(),
|
||||
brokerInternalQueryConfig
|
||||
);
|
||||
|
||||
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
|
||||
// This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context
|
||||
EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)).andReturn(null);
|
||||
|
||||
EasyMock.replay(factoryMock, lifecycleMock);
|
||||
|
||||
mySchema.runSegmentMetadataQuery(segmentIterable);
|
||||
|
||||
EasyMock.verify(factoryMock, lifecycleMock);
|
||||
|
||||
}
|
||||
|
||||
private static DataSegment newSegment(String datasource, int partitionId)
|
||||
{
|
||||
return new DataSegment(
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
|
|||
import org.apache.calcite.schema.SchemaPlus;
|
||||
import org.apache.calcite.schema.Table;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.client.DruidServer;
|
||||
import org.apache.druid.client.ImmutableDruidDataSource;
|
||||
import org.apache.druid.client.ImmutableDruidServer;
|
||||
|
@ -255,7 +256,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopEscalator()
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig()
|
||||
);
|
||||
druidSchema.start();
|
||||
druidSchema.awaitInitialization();
|
||||
|
|
|
@ -31,6 +31,7 @@ import com.google.inject.Injector;
|
|||
import com.google.inject.Key;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.schema.SchemaPlus;
|
||||
import org.apache.druid.client.BrokerInternalQueryConfig;
|
||||
import org.apache.druid.client.BrokerSegmentWatcherConfig;
|
||||
import org.apache.druid.client.DruidServer;
|
||||
import org.apache.druid.client.ServerInventoryView;
|
||||
|
@ -1243,7 +1244,8 @@ public class CalciteTests
|
|||
},
|
||||
createDefaultJoinableFactory(),
|
||||
plannerConfig,
|
||||
TEST_AUTHENTICATOR_ESCALATOR
|
||||
TEST_AUTHENTICATOR_ESCALATOR,
|
||||
new BrokerInternalQueryConfig()
|
||||
);
|
||||
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue