mirror of https://github.com/apache/druid.git
Select broker based on query context parameter `brokerService` (#11495)
This change allows the selection of a specific broker service (or broker tier) by the Router. The newly added ManualTieredBrokerSelectorStrategy works as follows: Check for the parameter brokerService in the query context. If this is a valid broker service, use it. Check if the field defaultManualBrokerService has been set in the strategy. If this is a valid broker service, use it. Move on to the next strategy
This commit is contained in:
parent
60fdf7a734
commit
8a4e27f51d
|
@ -109,6 +109,19 @@ Including this strategy means all timeBoundary queries are always routed to the
|
||||||
|
|
||||||
Queries with a priority set to less than minPriority are routed to the lowest priority Broker. Queries with priority set to greater than maxPriority are routed to the highest priority Broker. By default, minPriority is 0 and maxPriority is 1. Using these default values, if a query with priority 0 (the default query priority is 0) is sent, the query skips the priority selection logic.
|
Queries with a priority set to less than minPriority are routed to the lowest priority Broker. Queries with priority set to greater than maxPriority are routed to the highest priority Broker. By default, minPriority is 0 and maxPriority is 1. Using these default values, if a query with priority 0 (the default query priority is 0) is sent, the query skips the priority selection logic.
|
||||||
|
|
||||||
|
#### manual
|
||||||
|
|
||||||
|
This strategy reads the parameter `brokerService` from the query context and routes the query to that broker service. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used to determine target broker service given the value is valid and non-null. A value is considered valid if it is present in `druid.router.tierToBrokerMap`
|
||||||
|
|
||||||
|
*Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context.
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "manual",
|
||||||
|
"defaultManualBrokerService": "druid:broker-hot"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
#### JavaScript
|
#### JavaScript
|
||||||
|
|
||||||
Allows defining arbitrary routing rules using a JavaScript function. The function is passed the configuration and the query to be executed, and returns the tier it should be routed to, or null for the default tier.
|
Allows defining arbitrary routing rules using a JavaScript function. The function is passed the configuration and the query to be executed, and returns the tier it should be routed to, or null for the default tier.
|
||||||
|
@ -203,4 +216,3 @@ druid.router.http.numMaxThreads=100
|
||||||
|
|
||||||
druid.server.http.numThreads=100
|
druid.server.http.numThreads=100
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ Unless otherwise noted, the following parameters apply to all query types.
|
||||||
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|
||||||
|lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.|
|
|lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.|
|
||||||
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|
||||||
|
|brokerService | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *manual*. See [Router strategies](../design/router.md#router-strategies) for more details.|
|
||||||
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache |
|
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache |
|
||||||
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache |
|
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache |
|
||||||
|useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache |
|
|useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache |
|
||||||
|
|
|
@ -66,6 +66,7 @@ public class QueryContexts
|
||||||
public static final String USE_CACHE_KEY = "useCache";
|
public static final String USE_CACHE_KEY = "useCache";
|
||||||
public static final String SECONDARY_PARTITION_PRUNING_KEY = "secondaryPartitionPruning";
|
public static final String SECONDARY_PARTITION_PRUNING_KEY = "secondaryPartitionPruning";
|
||||||
public static final String BY_SEGMENT_KEY = "bySegment";
|
public static final String BY_SEGMENT_KEY = "bySegment";
|
||||||
|
public static final String BROKER_SERVICE_NAME = "brokerService";
|
||||||
|
|
||||||
public static final boolean DEFAULT_BY_SEGMENT = false;
|
public static final boolean DEFAULT_BY_SEGMENT = false;
|
||||||
public static final boolean DEFAULT_POPULATE_CACHE = true;
|
public static final boolean DEFAULT_POPULATE_CACHE = true;
|
||||||
|
@ -410,6 +411,11 @@ public class QueryContexts
|
||||||
return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
|
return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> String getBrokerServiceName(Query<T> query)
|
||||||
|
{
|
||||||
|
return query.getContextValue(BROKER_SERVICE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
static <T> long parseLong(Query<T> query, String key, long defaultValue)
|
static <T> long parseLong(Query<T> query, String key, long defaultValue)
|
||||||
{
|
{
|
||||||
final Object val = query.getContextValue(key);
|
final Object val = query.getContextValue(key);
|
||||||
|
|
|
@ -145,4 +145,36 @@ public class QueryContextsTest
|
||||||
false
|
false
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBrokerServiceName()
|
||||||
|
{
|
||||||
|
Query<?> query = new TestQuery(
|
||||||
|
new TableDataSource("test"),
|
||||||
|
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
|
||||||
|
false,
|
||||||
|
new HashMap<>()
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertNull(QueryContexts.getBrokerServiceName(query));
|
||||||
|
|
||||||
|
query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker");
|
||||||
|
Assert.assertEquals("hotBroker", QueryContexts.getBrokerServiceName(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBrokerServiceName_withNonStringValue()
|
||||||
|
{
|
||||||
|
Query<?> query = new TestQuery(
|
||||||
|
new TableDataSource("test"),
|
||||||
|
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
|
||||||
|
false,
|
||||||
|
new HashMap<>()
|
||||||
|
);
|
||||||
|
|
||||||
|
query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, 100);
|
||||||
|
|
||||||
|
exception.expect(ClassCastException.class);
|
||||||
|
QueryContexts.getBrokerServiceName(query);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.query.QueryContexts;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of {@link TieredBrokerSelectorStrategy} which uses the parameter
|
||||||
|
* {@link QueryContexts#BROKER_SERVICE_NAME} in the Query context to select the
|
||||||
|
* Broker Service.
|
||||||
|
* <p>
|
||||||
|
* If the {@link #defaultManualBrokerService} is set to a valid Broker Service Name,
|
||||||
|
* then all queries that do not specify a valid value for
|
||||||
|
* {@link QueryContexts#BROKER_SERVICE_NAME} would be directed to the
|
||||||
|
* {@code #defaultManualBrokerService}. Note that the {@code defaultManualBrokerService}
|
||||||
|
* can be different from the {@link TieredBrokerConfig#getDefaultBrokerServiceName()}.
|
||||||
|
*/
|
||||||
|
public class ManualTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(ManualTieredBrokerSelectorStrategy.class);
|
||||||
|
|
||||||
|
private final String defaultManualBrokerService;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public ManualTieredBrokerSelectorStrategy(
|
||||||
|
@JsonProperty("defaultManualBrokerService") @Nullable String defaultManualBrokerService
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.defaultManualBrokerService = defaultManualBrokerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
final String contextBrokerService = QueryContexts.getBrokerServiceName(query);
|
||||||
|
|
||||||
|
if (isValidBrokerService(contextBrokerService, tierConfig)) {
|
||||||
|
// If the broker service in the query context is valid, use that
|
||||||
|
return Optional.of(contextBrokerService);
|
||||||
|
} else if (isValidBrokerService(defaultManualBrokerService, tierConfig)) {
|
||||||
|
// If the fallbackBrokerService is valid, use that
|
||||||
|
return Optional.of(defaultManualBrokerService);
|
||||||
|
} else {
|
||||||
|
log.warn(
|
||||||
|
"Could not find Broker Service [%s] or default [%s] in TieredBrokerConfig",
|
||||||
|
contextBrokerService,
|
||||||
|
defaultManualBrokerService
|
||||||
|
);
|
||||||
|
return Optional.absent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Error getting Broker Service name from Query Context");
|
||||||
|
return isValidBrokerService(defaultManualBrokerService, tierConfig)
|
||||||
|
? Optional.of(defaultManualBrokerService) : Optional.absent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isValidBrokerService(String brokerServiceName, TieredBrokerConfig tierConfig)
|
||||||
|
{
|
||||||
|
return !StringUtils.isEmpty(brokerServiceName)
|
||||||
|
&& tierConfig.getTierToBrokerMap().containsValue(brokerServiceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
String getDefaultManualBrokerService()
|
||||||
|
{
|
||||||
|
return defaultManualBrokerService;
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,6 +30,7 @@ import org.apache.druid.query.Query;
|
||||||
@JsonSubTypes(value = {
|
@JsonSubTypes(value = {
|
||||||
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
|
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
|
||||||
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class),
|
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class),
|
||||||
|
@JsonSubTypes.Type(name = "manual", value = ManualTieredBrokerSelectorStrategy.class),
|
||||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptTieredBrokerSelectorStrategy.class)
|
@JsonSubTypes.Type(name = "javascript", value = JavaScriptTieredBrokerSelectorStrategy.class)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,214 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
import org.apache.druid.query.Druids;
|
||||||
|
import org.apache.druid.query.QueryContexts;
|
||||||
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class ManualTieredBrokerSelectorStrategyTest
|
||||||
|
{
|
||||||
|
private TieredBrokerConfig tieredBrokerConfig;
|
||||||
|
private Druids.TimeseriesQueryBuilder queryBuilder;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup()
|
||||||
|
{
|
||||||
|
tieredBrokerConfig = new TieredBrokerConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String getDefaultBrokerServiceName()
|
||||||
|
{
|
||||||
|
return Names.BROKER_SVC_HOT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LinkedHashMap<String, String> getTierToBrokerMap()
|
||||||
|
{
|
||||||
|
LinkedHashMap<String, String> tierToBrokerMap = new LinkedHashMap<>();
|
||||||
|
tierToBrokerMap.put("hotTier", Names.BROKER_SVC_HOT);
|
||||||
|
tierToBrokerMap.put("mediumTier", Names.BROKER_SVC_MEDIUM);
|
||||||
|
tierToBrokerMap.put("coldTier", Names.BROKER_SVC_COLD);
|
||||||
|
|
||||||
|
return tierToBrokerMap;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
queryBuilder =
|
||||||
|
Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource("test")
|
||||||
|
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
|
||||||
|
.intervals(
|
||||||
|
new MultipleIntervalSegmentSpec(
|
||||||
|
Collections.singletonList(Intervals.of("2009/2010"))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerde() throws Exception
|
||||||
|
{
|
||||||
|
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
String json = "{\"type\":\"manual\"}";
|
||||||
|
TieredBrokerSelectorStrategy strategy = mapper.readValue(
|
||||||
|
json,
|
||||||
|
TieredBrokerSelectorStrategy.class
|
||||||
|
);
|
||||||
|
assertTrue(strategy instanceof ManualTieredBrokerSelectorStrategy);
|
||||||
|
|
||||||
|
ManualTieredBrokerSelectorStrategy queryContextStrategy =
|
||||||
|
(ManualTieredBrokerSelectorStrategy) strategy;
|
||||||
|
assertNull(queryContextStrategy.getDefaultManualBrokerService());
|
||||||
|
|
||||||
|
json = "{\"type\":\"manual\",\"defaultManualBrokerService\":\"hotBroker\"}";
|
||||||
|
queryContextStrategy = mapper.readValue(
|
||||||
|
json,
|
||||||
|
ManualTieredBrokerSelectorStrategy.class
|
||||||
|
);
|
||||||
|
assertEquals(queryContextStrategy.getDefaultManualBrokerService(), "hotBroker");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBrokerServiceName()
|
||||||
|
{
|
||||||
|
final ManualTieredBrokerSelectorStrategy strategy =
|
||||||
|
new ManualTieredBrokerSelectorStrategy(null);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
Optional.absent(),
|
||||||
|
strategy.getBrokerServiceName(tieredBrokerConfig, queryBuilder.build())
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.absent(),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_HOT),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_COLD),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_COLD))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBrokerServiceName_withFallback()
|
||||||
|
{
|
||||||
|
final ManualTieredBrokerSelectorStrategy strategy =
|
||||||
|
new ManualTieredBrokerSelectorStrategy(Names.BROKER_SVC_MEDIUM);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_MEDIUM),
|
||||||
|
strategy.getBrokerServiceName(tieredBrokerConfig, queryBuilder.build())
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_MEDIUM),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_HOT),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBrokerServiceName_withInvalidFallback()
|
||||||
|
{
|
||||||
|
final ManualTieredBrokerSelectorStrategy strategy =
|
||||||
|
new ManualTieredBrokerSelectorStrategy("noSuchBroker");
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
Optional.absent(),
|
||||||
|
strategy.getBrokerServiceName(tieredBrokerConfig, queryBuilder.build())
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.absent(),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_HOT),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test constants.
|
||||||
|
*/
|
||||||
|
private static class Names
|
||||||
|
{
|
||||||
|
static final String BROKER_SVC_HOT = "druid/hotBroker";
|
||||||
|
static final String BROKER_SVC_MEDIUM = "druid/mediumBroker";
|
||||||
|
static final String BROKER_SVC_COLD = "druid/coldBroker";
|
||||||
|
|
||||||
|
static final String INVALID_BROKER = "invalidBroker";
|
||||||
|
}
|
||||||
|
}
|
|
@ -37,6 +37,7 @@ import org.apache.druid.guice.annotations.Json;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.Pair;
|
||||||
import org.apache.druid.query.Druids;
|
import org.apache.druid.query.Druids;
|
||||||
|
import org.apache.druid.query.QueryContexts;
|
||||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
|
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||||
|
@ -135,7 +136,11 @@ public class TieredBrokerHostSelectorTest
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
druidNodeDiscoveryProvider,
|
druidNodeDiscoveryProvider,
|
||||||
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1))
|
Arrays.asList(
|
||||||
|
new ManualTieredBrokerSelectorStrategy(null),
|
||||||
|
new TimeBoundaryTieredBrokerSelectorStrategy(),
|
||||||
|
new PriorityTieredBrokerSelectorStrategy(0, 1)
|
||||||
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
brokerSelector.start();
|
brokerSelector.start();
|
||||||
|
@ -293,6 +298,41 @@ public class TieredBrokerHostSelectorTest
|
||||||
Assert.assertEquals("hotBroker", brokerName);
|
Assert.assertEquals("hotBroker", brokerName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectBasedOnQueryContext()
|
||||||
|
{
|
||||||
|
final Druids.TimeseriesQueryBuilder queryBuilder =
|
||||||
|
Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource("test")
|
||||||
|
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
|
||||||
|
.intervals(
|
||||||
|
new MultipleIntervalSegmentSpec(
|
||||||
|
Collections.singletonList(Intervals.of("2009/2010"))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
brokerSelector.getDefaultServiceName(),
|
||||||
|
brokerSelector.select(queryBuilder.build()).lhs
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"hotBroker",
|
||||||
|
brokerSelector.select(
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "hotBroker"))
|
||||||
|
.build()
|
||||||
|
).lhs
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"coldBroker",
|
||||||
|
brokerSelector.select(
|
||||||
|
queryBuilder
|
||||||
|
.context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "coldBroker"))
|
||||||
|
.build()
|
||||||
|
).lhs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAllBrokers()
|
public void testGetAllBrokers()
|
||||||
{
|
{
|
||||||
|
|
|
@ -477,7 +477,9 @@ defaultUser
|
||||||
inputSegmentSizeBytes
|
inputSegmentSizeBytes
|
||||||
skipOffsetFromLatest
|
skipOffsetFromLatest
|
||||||
- ../docs/design/router.md
|
- ../docs/design/router.md
|
||||||
|
brokerService
|
||||||
c3.2xlarge
|
c3.2xlarge
|
||||||
|
defaultManualBrokerService
|
||||||
maxPriority
|
maxPriority
|
||||||
minPriority
|
minPriority
|
||||||
runtime.properties
|
runtime.properties
|
||||||
|
@ -1426,6 +1428,7 @@ fieldAccess
|
||||||
finalizingFieldAccess
|
finalizingFieldAccess
|
||||||
hyperUniqueCardinality
|
hyperUniqueCardinality
|
||||||
- ../docs/querying/query-context.md
|
- ../docs/querying/query-context.md
|
||||||
|
brokerService
|
||||||
bySegment
|
bySegment
|
||||||
doubleSum
|
doubleSum
|
||||||
druid.broker.cache.populateCache
|
druid.broker.cache.populateCache
|
||||||
|
|
Loading…
Reference in New Issue