mirror of https://github.com/apache/druid.git
Enable routing of SQL queries at Router (#11566)
This PR adds a new property druid.router.sql.enable which allows the Router to handle SQL queries when set to true. This change does not affect Avatica JDBC requests and they are still routed by hashing the Connection ID. To allow parsing of the request object as a SqlQuery (contained in module druid-sql), some classes have been moved from druid-server to druid-services with the same package name.
This commit is contained in:
parent
ccd362d228
commit
aaf0aaad8f
|
@ -2052,6 +2052,7 @@ Supported query contexts:
|
||||||
|`druid.router.tierToBrokerMap`|Queries for a certain tier of data are routed to their appropriate Broker. This value should be an ordered JSON map of tiers to Broker names. The priority of Brokers is based on the ordering.|{"_default_tier": "<defaultBrokerServiceName>"}|
|
|`druid.router.tierToBrokerMap`|Queries for a certain tier of data are routed to their appropriate Broker. This value should be an ordered JSON map of tiers to Broker names. The priority of Brokers is based on the ordering.|{"_default_tier": "<defaultBrokerServiceName>"}|
|
||||||
|`druid.router.defaultRule`|The default rule for all datasources.|"_default"|
|
|`druid.router.defaultRule`|The default rule for all datasources.|"_default"|
|
||||||
|`druid.router.pollPeriod`|How often to poll for new rules.|PT1M|
|
|`druid.router.pollPeriod`|How often to poll for new rules.|PT1M|
|
||||||
|
|`druid.router.sql.enable`|Enable routing of SQL queries. Possible values are `true` and `false`. When set to `true`, the Router uses the provided strategies to determine the broker service for a given SQL query.|`false`|
|
||||||
|`druid.router.strategies`|Please see [Router Strategies](../design/router.md#router-strategies) for details.|[{"type":"timeBoundary"},{"type":"priority"}]|
|
|`druid.router.strategies`|Please see [Router Strategies](../design/router.md#router-strategies) for details.|[{"type":"timeBoundary"},{"type":"priority"}]|
|
||||||
|`druid.router.avatica.balancer.type`|Class to use for balancing Avatica queries across Brokers. Please see [Avatica Query Balancing](../design/router.md#avatica-query-balancing).|rendezvousHash|
|
|`druid.router.avatica.balancer.type`|Class to use for balancing Avatica queries across Brokers. Please see [Avatica Query Balancing](../design/router.md#avatica-query-balancing).|rendezvousHash|
|
||||||
|`druid.router.managementProxy.enabled`|Enables the Router's [management proxy](../design/router.md#router-as-management-proxy) functionality.|false|
|
|`druid.router.managementProxy.enabled`|Enables the Router's [management proxy](../design/router.md#router-as-management-proxy) functionality.|false|
|
||||||
|
|
|
@ -112,6 +112,7 @@ Queries with a priority set to less than minPriority are routed to the lowest pr
|
||||||
#### manual
|
#### manual
|
||||||
|
|
||||||
This strategy reads the parameter `brokerService` from the query context and routes the query to that broker service. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used to determine target broker service given the value is valid and non-null. A value is considered valid if it is present in `druid.router.tierToBrokerMap`
|
This strategy reads the parameter `brokerService` from the query context and routes the query to that broker service. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used to determine target broker service given the value is valid and non-null. A value is considered valid if it is present in `druid.router.tierToBrokerMap`
|
||||||
|
This strategy can route both Native and SQL queries (when enabled).
|
||||||
|
|
||||||
*Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context.
|
*Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context.
|
||||||
|
|
||||||
|
@ -137,6 +138,18 @@ Allows defining arbitrary routing rules using a JavaScript function. The functio
|
||||||
|
|
||||||
> JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
|
> JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
|
||||||
|
|
||||||
|
### Routing of SQL queries
|
||||||
|
|
||||||
|
To enable routing of SQL queries, set `druid.router.sql.enable` to `true` (`false` by default). The broker service for a
|
||||||
|
given SQL query is resolved using only the provided Router strategies. If not resolved using any of the strategies, the
|
||||||
|
Router uses the `defaultBrokerServiceName`. This behavior is slightly different from native queries where the Router
|
||||||
|
first tries to resolve the broker service using strategies, then load rules and finally using the `defaultBrokerServiceName`
|
||||||
|
if still not resolved.
|
||||||
|
|
||||||
|
Routing of native queries is always enabled.
|
||||||
|
|
||||||
|
Setting `druid.router.sql.enable` does not affect Avatica JDBC requests. They are routed based on connection ID as
|
||||||
|
explained in the next section.
|
||||||
|
|
||||||
### Avatica query balancing
|
### Avatica query balancing
|
||||||
|
|
||||||
|
|
|
@ -377,6 +377,11 @@
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.protobuf</groupId>
|
||||||
|
<artifactId>protobuf-java</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.druid</groupId>
|
<groupId>org.apache.druid</groupId>
|
||||||
<artifactId>druid-server</artifactId>
|
<artifactId>druid-server</artifactId>
|
||||||
|
|
|
@ -329,6 +329,11 @@ public class QueryContexts
|
||||||
return parseBoolean(query, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG);
|
return parseBoolean(query, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isDebug(Map<String, Object> queryContext)
|
||||||
|
{
|
||||||
|
return parseBoolean(queryContext, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG);
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
|
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
|
||||||
{
|
{
|
||||||
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
|
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
|
||||||
|
@ -418,9 +423,9 @@ public class QueryContexts
|
||||||
return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
|
return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> String getBrokerServiceName(Query<T> query)
|
public static String getBrokerServiceName(Map<String, Object> queryContext)
|
||||||
{
|
{
|
||||||
return query.getContextValue(BROKER_SERVICE_NAME);
|
return queryContext == null ? null : (String) queryContext.get(BROKER_SERVICE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
static <T> long parseLong(Query<T> query, String key, long defaultValue)
|
static <T> long parseLong(Query<T> query, String key, long defaultValue)
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class QueryContextsTest
|
public class QueryContextsTest
|
||||||
{
|
{
|
||||||
|
@ -149,33 +150,21 @@ public class QueryContextsTest
|
||||||
@Test
|
@Test
|
||||||
public void testGetBrokerServiceName()
|
public void testGetBrokerServiceName()
|
||||||
{
|
{
|
||||||
Query<?> query = new TestQuery(
|
Map<String, Object> queryContext = new HashMap<>();
|
||||||
new TableDataSource("test"),
|
Assert.assertNull(QueryContexts.getBrokerServiceName(queryContext));
|
||||||
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
|
|
||||||
false,
|
|
||||||
new HashMap<>()
|
|
||||||
);
|
|
||||||
|
|
||||||
Assert.assertNull(QueryContexts.getBrokerServiceName(query));
|
queryContext.put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker");
|
||||||
|
Assert.assertEquals("hotBroker", QueryContexts.getBrokerServiceName(queryContext));
|
||||||
query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker");
|
|
||||||
Assert.assertEquals("hotBroker", QueryContexts.getBrokerServiceName(query));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetBrokerServiceName_withNonStringValue()
|
public void testGetBrokerServiceName_withNonStringValue()
|
||||||
{
|
{
|
||||||
Query<?> query = new TestQuery(
|
Map<String, Object> queryContext = new HashMap<>();
|
||||||
new TableDataSource("test"),
|
queryContext.put(QueryContexts.BROKER_SERVICE_NAME, 100);
|
||||||
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
|
|
||||||
false,
|
|
||||||
new HashMap<>()
|
|
||||||
);
|
|
||||||
|
|
||||||
query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, 100);
|
|
||||||
|
|
||||||
exception.expect(ClassCastException.class);
|
exception.expect(ClassCastException.class);
|
||||||
QueryContexts.getBrokerServiceName(query);
|
QueryContexts.getBrokerServiceName(queryContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -188,6 +177,7 @@ public class QueryContextsTest
|
||||||
ImmutableMap.of()
|
ImmutableMap.of()
|
||||||
);
|
);
|
||||||
Assert.assertFalse(QueryContexts.isDebug(query));
|
Assert.assertFalse(QueryContexts.isDebug(query));
|
||||||
|
Assert.assertFalse(QueryContexts.isDebug(query.getContext()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -200,5 +190,6 @@ public class QueryContextsTest
|
||||||
ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true)
|
ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true)
|
||||||
);
|
);
|
||||||
Assert.assertTrue(QueryContexts.isDebug(query));
|
Assert.assertTrue(QueryContexts.isDebug(query));
|
||||||
|
Assert.assertTrue(QueryContexts.isDebug(query.getContext()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -323,10 +323,6 @@
|
||||||
<groupId>com.fasterxml.jackson.module</groupId>
|
<groupId>com.fasterxml.jackson.module</groupId>
|
||||||
<artifactId>jackson-module-guice</artifactId>
|
<artifactId>jackson-module-guice</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.calcite.avatica</groupId>
|
|
||||||
<artifactId>avatica-core</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -80,6 +80,18 @@
|
||||||
<groupId>commons-io</groupId>
|
<groupId>commons-io</groupId>
|
||||||
<artifactId>commons-io</artifactId>
|
<artifactId>commons-io</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
|
<artifactId>jetty-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
|
<artifactId>jetty-http</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
|
<artifactId>jetty-proxy</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.jetty</groupId>
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
<artifactId>jetty-server</artifactId>
|
<artifactId>jetty-server</artifactId>
|
||||||
|
@ -88,6 +100,10 @@
|
||||||
<groupId>org.apache.curator</groupId>
|
<groupId>org.apache.curator</groupId>
|
||||||
<artifactId>curator-framework</artifactId>
|
<artifactId>curator-framework</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.calcite.avatica</groupId>
|
||||||
|
<artifactId>avatica-core</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>joda-time</groupId>
|
<groupId>joda-time</groupId>
|
||||||
<artifactId>joda-time</artifactId>
|
<artifactId>joda-time</artifactId>
|
||||||
|
@ -96,14 +112,38 @@
|
||||||
<groupId>com.google.inject</groupId>
|
<groupId>com.google.inject</groupId>
|
||||||
<artifactId>guice</artifactId>
|
<artifactId>guice</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-annotations</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||||
|
<artifactId>jackson-jaxrs-smile-provider</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.opencsv</groupId>
|
<groupId>com.opencsv</groupId>
|
||||||
<artifactId>opencsv</artifactId>
|
<artifactId>opencsv</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.validation</groupId>
|
||||||
|
<artifactId>validation-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.errorprone</groupId>
|
||||||
|
<artifactId>error_prone_annotations</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-lang</groupId>
|
||||||
|
<artifactId>commons-lang</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.ws.rs</groupId>
|
||||||
|
<artifactId>jsr311-api</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.jetty</groupId>
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
<artifactId>jetty-servlet</artifactId>
|
<artifactId>jetty-servlet</artifactId>
|
||||||
|
@ -112,6 +152,10 @@
|
||||||
<groupId>org.eclipse.jetty</groupId>
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
<artifactId>jetty-rewrite</artifactId>
|
<artifactId>jetty-rewrite</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.sun.jersey</groupId>
|
||||||
|
<artifactId>jersey-server</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.inject.extensions</groupId>
|
<groupId>com.google.inject.extensions</groupId>
|
||||||
<artifactId>guice-multibindings</artifactId>
|
<artifactId>guice-multibindings</artifactId>
|
||||||
|
@ -124,6 +168,10 @@
|
||||||
<groupId>org.eclipse.jetty</groupId>
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
<artifactId>jetty-util</artifactId>
|
<artifactId>jetty-util</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.netty</groupId>
|
||||||
|
<artifactId>netty</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty-common</artifactId>
|
<artifactId>netty-common</artifactId>
|
||||||
|
@ -161,6 +209,27 @@
|
||||||
<artifactId>jaxb-api</artifactId>
|
<artifactId>jaxb-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- Test Dependencies -->
|
<!-- Test Dependencies -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.druid</groupId>
|
||||||
|
<artifactId>druid-core</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.druid</groupId>
|
||||||
|
<artifactId>druid-server</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.druid</groupId>
|
||||||
|
<artifactId>druid-processing</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
@ -176,6 +245,11 @@
|
||||||
<artifactId>hamcrest-core</artifactId>
|
<artifactId>hamcrest-core</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.easymock</groupId>
|
||||||
|
<artifactId>easymock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.druid.server.security.AuthConfig;
|
||||||
import org.apache.druid.server.security.AuthenticationResult;
|
import org.apache.druid.server.security.AuthenticationResult;
|
||||||
import org.apache.druid.server.security.Authenticator;
|
import org.apache.druid.server.security.Authenticator;
|
||||||
import org.apache.druid.server.security.AuthenticatorMapper;
|
import org.apache.druid.server.security.AuthenticatorMapper;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
import org.eclipse.jetty.client.HttpClient;
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.api.Response;
|
import org.eclipse.jetty.client.api.Response;
|
||||||
|
@ -68,6 +69,7 @@ import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -88,8 +90,12 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
private static final String SCHEME_ATTRIBUTE = "org.apache.druid.proxy.to.host.scheme";
|
private static final String SCHEME_ATTRIBUTE = "org.apache.druid.proxy.to.host.scheme";
|
||||||
private static final String QUERY_ATTRIBUTE = "org.apache.druid.proxy.query";
|
private static final String QUERY_ATTRIBUTE = "org.apache.druid.proxy.query";
|
||||||
private static final String AVATICA_QUERY_ATTRIBUTE = "org.apache.druid.proxy.avaticaQuery";
|
private static final String AVATICA_QUERY_ATTRIBUTE = "org.apache.druid.proxy.avaticaQuery";
|
||||||
|
private static final String SQL_QUERY_ATTRIBUTE = "org.apache.druid.proxy.sqlQuery";
|
||||||
private static final String OBJECTMAPPER_ATTRIBUTE = "org.apache.druid.proxy.objectMapper";
|
private static final String OBJECTMAPPER_ATTRIBUTE = "org.apache.druid.proxy.objectMapper";
|
||||||
|
|
||||||
|
private static final String PROPERTY_SQL_ENABLE = "druid.router.sql.enable";
|
||||||
|
private static final String PROPERTY_SQL_ENABLE_DEFAULT = "false";
|
||||||
|
|
||||||
private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
|
private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
|
||||||
|
|
||||||
private final AtomicLong successfulQueryCount = new AtomicLong();
|
private final AtomicLong successfulQueryCount = new AtomicLong();
|
||||||
|
@ -124,6 +130,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
private final AuthenticatorMapper authenticatorMapper;
|
private final AuthenticatorMapper authenticatorMapper;
|
||||||
private final ProtobufTranslation protobufTranslation;
|
private final ProtobufTranslation protobufTranslation;
|
||||||
|
|
||||||
|
private final boolean routeSqlQueries;
|
||||||
|
|
||||||
private HttpClient broadcastClient;
|
private HttpClient broadcastClient;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -137,7 +145,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
RequestLogger requestLogger,
|
RequestLogger requestLogger,
|
||||||
GenericQueryMetricsFactory queryMetricsFactory,
|
GenericQueryMetricsFactory queryMetricsFactory,
|
||||||
AuthenticatorMapper authenticatorMapper
|
AuthenticatorMapper authenticatorMapper,
|
||||||
|
Properties properties
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.warehouse = warehouse;
|
this.warehouse = warehouse;
|
||||||
|
@ -151,6 +160,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
this.queryMetricsFactory = queryMetricsFactory;
|
this.queryMetricsFactory = queryMetricsFactory;
|
||||||
this.authenticatorMapper = authenticatorMapper;
|
this.authenticatorMapper = authenticatorMapper;
|
||||||
this.protobufTranslation = new ProtobufTranslationImpl();
|
this.protobufTranslation = new ProtobufTranslationImpl();
|
||||||
|
this.routeSqlQueries = Boolean.parseBoolean(
|
||||||
|
properties.getProperty(PROPERTY_SQL_ENABLE, PROPERTY_SQL_ENABLE_DEFAULT)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -195,7 +207,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
|
|
||||||
// The Router does not have the ability to look inside SQL queries and route them intelligently, so just treat
|
// The Router does not have the ability to look inside SQL queries and route them intelligently, so just treat
|
||||||
// them as a generic request.
|
// them as a generic request.
|
||||||
final boolean isQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql");
|
final boolean isNativeQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql");
|
||||||
|
final boolean isSqlQueryEndpoint = requestURI.startsWith("/druid/v2/sql");
|
||||||
|
|
||||||
final boolean isAvaticaJson = requestURI.startsWith("/druid/v2/sql/avatica");
|
final boolean isAvaticaJson = requestURI.startsWith("/druid/v2/sql/avatica");
|
||||||
final boolean isAvaticaPb = requestURI.startsWith("/druid/v2/sql/avatica-protobuf");
|
final boolean isAvaticaPb = requestURI.startsWith("/druid/v2/sql/avatica-protobuf");
|
||||||
|
@ -215,36 +228,11 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
targetServer = hostFinder.findServerAvatica(connectionId);
|
targetServer = hostFinder.findServerAvatica(connectionId);
|
||||||
byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
|
byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
|
||||||
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
||||||
} else if (isQueryEndpoint && HttpMethod.DELETE.is(method)) {
|
} else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) {
|
||||||
// query cancellation request
|
// query cancellation request
|
||||||
targetServer = hostFinder.pickDefaultServer();
|
targetServer = hostFinder.pickDefaultServer();
|
||||||
|
broadcastQueryCancelRequest(request, targetServer);
|
||||||
for (final Server server : hostFinder.getAllServers()) {
|
} else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) {
|
||||||
// send query cancellation to all brokers this query may have gone to
|
|
||||||
// to keep the code simple, the proxy servlet will also send a request to the default targetServer.
|
|
||||||
if (!server.getHost().equals(targetServer.getHost())) {
|
|
||||||
// issue async requests
|
|
||||||
Response.CompleteListener completeListener = result -> {
|
|
||||||
if (result.isFailed()) {
|
|
||||||
log.warn(
|
|
||||||
result.getFailure(),
|
|
||||||
"Failed to forward cancellation request to [%s]",
|
|
||||||
server.getHost()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Request broadcastReq = broadcastClient
|
|
||||||
.newRequest(rewriteURI(request, server.getScheme(), server.getHost()))
|
|
||||||
.method(HttpMethod.DELETE)
|
|
||||||
.timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
copyRequestHeaders(request, broadcastReq);
|
|
||||||
broadcastReq.send(completeListener);
|
|
||||||
}
|
|
||||||
interruptedQueryCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
} else if (isQueryEndpoint && HttpMethod.POST.is(method)) {
|
|
||||||
// query request
|
// query request
|
||||||
try {
|
try {
|
||||||
Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
|
Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
|
||||||
|
@ -259,23 +247,21 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
|
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
log.warn(e, "Exception parsing query");
|
handleQueryParseException(request, response, objectMapper, e, true);
|
||||||
final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage();
|
return;
|
||||||
requestLogger.logNativeQuery(
|
}
|
||||||
RequestLogLine.forNative(
|
catch (Exception e) {
|
||||||
null,
|
handleException(response, objectMapper, e);
|
||||||
DateTimes.nowUtc(),
|
return;
|
||||||
request.getRemoteAddr(),
|
}
|
||||||
new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage))
|
} else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
|
||||||
)
|
try {
|
||||||
);
|
SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class);
|
||||||
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
|
||||||
response.setContentType(MediaType.APPLICATION_JSON);
|
targetServer = hostFinder.findServerSql(inputSqlQuery);
|
||||||
objectMapper.writeValue(
|
}
|
||||||
response.getOutputStream(),
|
catch (IOException e) {
|
||||||
ImmutableMap.of("error", errorMessage)
|
handleQueryParseException(request, response, objectMapper, e, false);
|
||||||
);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -292,6 +278,86 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
doService(request, response);
|
doService(request, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Issues async query cancellation requests to all Brokers (except the given
|
||||||
|
* targetServer). Query cancellation on the targetServer is handled by the
|
||||||
|
* proxy servlet.
|
||||||
|
*/
|
||||||
|
private void broadcastQueryCancelRequest(HttpServletRequest request, Server targetServer)
|
||||||
|
{
|
||||||
|
// send query cancellation to all brokers this query may have gone to
|
||||||
|
// to keep the code simple, the proxy servlet will also send a request to the default targetServer.
|
||||||
|
for (final Server server : hostFinder.getAllServers()) {
|
||||||
|
if (server.getHost().equals(targetServer.getHost())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// issue async requests
|
||||||
|
Response.CompleteListener completeListener = result -> {
|
||||||
|
if (result.isFailed()) {
|
||||||
|
log.warn(
|
||||||
|
result.getFailure(),
|
||||||
|
"Failed to forward cancellation request to [%s]",
|
||||||
|
server.getHost()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Request broadcastReq = broadcastClient
|
||||||
|
.newRequest(rewriteURI(request, server.getScheme(), server.getHost()))
|
||||||
|
.method(HttpMethod.DELETE)
|
||||||
|
.timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
copyRequestHeaders(request, broadcastReq);
|
||||||
|
broadcastReq.send(completeListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
interruptedQueryCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleQueryParseException(
|
||||||
|
HttpServletRequest request,
|
||||||
|
HttpServletResponse response,
|
||||||
|
ObjectMapper objectMapper,
|
||||||
|
IOException parseException,
|
||||||
|
boolean isNativeQuery
|
||||||
|
) throws IOException
|
||||||
|
{
|
||||||
|
log.warn(parseException, "Exception parsing query");
|
||||||
|
|
||||||
|
// Log the error message
|
||||||
|
final String errorMessage = parseException.getMessage() == null
|
||||||
|
? "no error message" : parseException.getMessage();
|
||||||
|
if (isNativeQuery) {
|
||||||
|
requestLogger.logNativeQuery(
|
||||||
|
RequestLogLine.forNative(
|
||||||
|
null,
|
||||||
|
DateTimes.nowUtc(),
|
||||||
|
request.getRemoteAddr(),
|
||||||
|
new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
requestLogger.logSqlQuery(
|
||||||
|
RequestLogLine.forSql(
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
DateTimes.nowUtc(),
|
||||||
|
request.getRemoteAddr(),
|
||||||
|
new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write to the response
|
||||||
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||||
|
response.setContentType(MediaType.APPLICATION_JSON);
|
||||||
|
objectMapper.writeValue(
|
||||||
|
response.getOutputStream(),
|
||||||
|
ImmutableMap.of("error", errorMessage)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
protected void doService(
|
protected void doService(
|
||||||
HttpServletRequest request,
|
HttpServletRequest request,
|
||||||
HttpServletResponse response
|
HttpServletResponse response
|
||||||
|
@ -317,16 +383,11 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
}
|
}
|
||||||
|
|
||||||
final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE);
|
final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE);
|
||||||
|
final SqlQuery sqlQuery = (SqlQuery) clientRequest.getAttribute(SQL_QUERY_ATTRIBUTE);
|
||||||
if (query != null) {
|
if (query != null) {
|
||||||
final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
|
setProxyRequestContent(proxyRequest, clientRequest, query);
|
||||||
try {
|
} else if (sqlQuery != null) {
|
||||||
byte[] bytes = objectMapper.writeValueAsBytes(query);
|
setProxyRequestContent(proxyRequest, clientRequest, sqlQuery);
|
||||||
proxyRequest.content(new BytesContentProvider(bytes));
|
|
||||||
proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, String.valueOf(bytes.length));
|
|
||||||
}
|
|
||||||
catch (JsonProcessingException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since we can't see the request object on the remote side, we can't check whether the remote side actually
|
// Since we can't see the request object on the remote side, we can't check whether the remote side actually
|
||||||
|
@ -358,6 +419,19 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setProxyRequestContent(Request proxyRequest, HttpServletRequest clientRequest, Object content)
|
||||||
|
{
|
||||||
|
final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
|
||||||
|
try {
|
||||||
|
byte[] bytes = objectMapper.writeValueAsBytes(content);
|
||||||
|
proxyRequest.content(new BytesContentProvider(bytes));
|
||||||
|
proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, String.valueOf(bytes.length));
|
||||||
|
}
|
||||||
|
catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
|
protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
|
||||||
{
|
{
|
|
@ -27,8 +27,10 @@ import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
import org.apache.druid.query.QueryContexts;
|
import org.apache.druid.query.QueryContexts;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of {@link TieredBrokerSelectorStrategy} which uses the parameter
|
* Implementation of {@link TieredBrokerSelectorStrategy} which uses the parameter
|
||||||
|
@ -57,9 +59,26 @@ public class ManualTieredBrokerSelectorStrategy implements TieredBrokerSelectorS
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
|
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
|
||||||
|
{
|
||||||
|
return getBrokerServiceName(tierConfig, query.getContext());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<String> getBrokerServiceName(TieredBrokerConfig config, SqlQuery sqlQuery)
|
||||||
|
{
|
||||||
|
return getBrokerServiceName(config, sqlQuery.getContext());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the Broker service name from the given query context.
|
||||||
|
*/
|
||||||
|
private Optional<String> getBrokerServiceName(
|
||||||
|
TieredBrokerConfig tierConfig,
|
||||||
|
Map<String, Object> queryContext
|
||||||
|
)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
final String contextBrokerService = QueryContexts.getBrokerServiceName(query);
|
final String contextBrokerService = QueryContexts.getBrokerServiceName(queryContext);
|
||||||
|
|
||||||
if (isValidBrokerService(contextBrokerService, tierConfig)) {
|
if (isValidBrokerService(contextBrokerService, tierConfig)) {
|
||||||
// If the broker service in the query context is valid, use that
|
// If the broker service in the query context is valid, use that
|
|
@ -25,9 +25,9 @@ import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.Pair;
|
||||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -66,21 +66,20 @@ public class QueryHostFinder
|
||||||
|
|
||||||
public Collection<Server> getAllServers()
|
public Collection<Server> getAllServers()
|
||||||
{
|
{
|
||||||
return ((Collection<List<Server>>) hostSelector.getAllBrokers().values()).stream()
|
return hostSelector.getAllBrokers().values().stream()
|
||||||
.flatMap(Collection::stream)
|
.flatMap(Collection::stream)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Server findServerAvatica(String connectionId)
|
public Server findServerAvatica(String connectionId)
|
||||||
{
|
{
|
||||||
Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
|
Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
|
||||||
if (chosenServer == null) {
|
assertServerFound(
|
||||||
log.makeAlert(
|
chosenServer,
|
||||||
"Catastrophic failure! No servers found at all! Failing request!"
|
"No server found for Avatica request with connectionId[%s]",
|
||||||
).emit();
|
connectionId
|
||||||
|
);
|
||||||
|
|
||||||
throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId);
|
|
||||||
}
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"Balancer class [%s] sending request with connectionId[%s] to server: %s",
|
"Balancer class [%s] sending request with connectionId[%s] to server: %s",
|
||||||
avaticaConnectionBalancer.getClass(),
|
avaticaConnectionBalancer.getClass(),
|
||||||
|
@ -90,35 +89,24 @@ public class QueryHostFinder
|
||||||
return chosenServer;
|
return chosenServer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Server findServerSql(SqlQuery sqlQuery)
|
||||||
|
{
|
||||||
|
Server server = findServerInner(hostSelector.selectForSql(sqlQuery));
|
||||||
|
assertServerFound(server, "No server found for SQL Query [%s]", "SELECT IT");
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
public <T> Server pickServer(Query<T> query)
|
public <T> Server pickServer(Query<T> query)
|
||||||
{
|
{
|
||||||
Server server = findServer(query);
|
Server server = findServer(query);
|
||||||
|
assertServerFound(server, "No server found for query[%s]", query);
|
||||||
if (server == null) {
|
|
||||||
log.makeAlert(
|
|
||||||
"Catastrophic failure! No servers found at all! Failing request!"
|
|
||||||
).emit();
|
|
||||||
|
|
||||||
throw new ISE("No server found for query[%s]", query);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("Selected [%s]", server.getHost());
|
|
||||||
|
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Server pickDefaultServer()
|
public Server pickDefaultServer()
|
||||||
{
|
{
|
||||||
Server server = findDefaultServer();
|
Server server = findDefaultServer();
|
||||||
|
assertServerFound(server, "No default server found!");
|
||||||
if (server == null) {
|
|
||||||
log.makeAlert(
|
|
||||||
"Catastrophic failure! No servers found at all! Failing request!"
|
|
||||||
).emit();
|
|
||||||
|
|
||||||
throw new ISE("No default server found!");
|
|
||||||
}
|
|
||||||
|
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,4 +143,18 @@ public class QueryHostFinder
|
||||||
|
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertServerFound(Server server, String messageFormat, Object... args)
|
||||||
|
{
|
||||||
|
if (server != null) {
|
||||||
|
log.debug("Selected [%s]", server.getHost());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.makeAlert(
|
||||||
|
"Catastrophic failure! No servers found at all! Failing request!"
|
||||||
|
).emit();
|
||||||
|
|
||||||
|
throw new ISE(messageFormat, args);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -36,8 +36,10 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
||||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.query.QueryContexts;
|
||||||
import org.apache.druid.server.coordinator.rules.LoadRule;
|
import org.apache.druid.server.coordinator.rules.LoadRule;
|
||||||
import org.apache.druid.server.coordinator.rules.Rule;
|
import org.apache.druid.server.coordinator.rules.Rule;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
@ -50,7 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class TieredBrokerHostSelector<T>
|
public class TieredBrokerHostSelector
|
||||||
{
|
{
|
||||||
private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
|
private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
|
||||||
|
|
||||||
|
@ -181,7 +183,7 @@ public class TieredBrokerHostSelector<T>
|
||||||
return tierConfig.getDefaultBrokerServiceName();
|
return tierConfig.getDefaultBrokerServiceName();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Pair<String, Server> select(final Query<T> query)
|
public <T> Pair<String, Server> select(final Query<T> query)
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (!ruleManager.isStarted() || !started) {
|
if (!ruleManager.isStarted() || !started) {
|
||||||
|
@ -243,8 +245,17 @@ public class TieredBrokerHostSelector<T>
|
||||||
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
||||||
}
|
}
|
||||||
|
|
||||||
NodesHolder nodesHolder = servers.get(brokerServiceName);
|
return getServerPair(brokerServiceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds a server for the given brokerServiceName and returns a pair containing
|
||||||
|
* the brokerServiceName and the found server. Uses the default broker service
|
||||||
|
* if no server is found for the given brokerServiceName.
|
||||||
|
*/
|
||||||
|
private Pair<String, Server> getServerPair(String brokerServiceName)
|
||||||
|
{
|
||||||
|
NodesHolder nodesHolder = servers.get(brokerServiceName);
|
||||||
if (nodesHolder == null) {
|
if (nodesHolder == null) {
|
||||||
log.error(
|
log.error(
|
||||||
"No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]",
|
"No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]",
|
||||||
|
@ -257,6 +268,42 @@ public class TieredBrokerHostSelector<T>
|
||||||
return new Pair<>(brokerServiceName, nodesHolder.pick());
|
return new Pair<>(brokerServiceName, nodesHolder.pick());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Pair<String, Server> selectForSql(SqlQuery sqlQuery)
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (!started) {
|
||||||
|
return getDefaultLookup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve brokerServiceName using Tier selector strategies
|
||||||
|
String brokerServiceName = null;
|
||||||
|
for (TieredBrokerSelectorStrategy strategy : strategies) {
|
||||||
|
final Optional<String> optionalName = strategy.getBrokerServiceName(tierConfig, sqlQuery);
|
||||||
|
if (optionalName.isPresent()) {
|
||||||
|
brokerServiceName = optionalName.get();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use defaut if not resolved by strategies
|
||||||
|
if (brokerServiceName == null) {
|
||||||
|
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
||||||
|
|
||||||
|
// Log if query debugging is enabled
|
||||||
|
if (QueryContexts.isDebug(sqlQuery.getContext())) {
|
||||||
|
log.info(
|
||||||
|
"No brokerServiceName found for SQL Query [%s], Context [%s]. Using default selector for [%s].",
|
||||||
|
sqlQuery.getQuery(),
|
||||||
|
sqlQuery.getContext(),
|
||||||
|
tierConfig.getDefaultBrokerServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return getServerPair(brokerServiceName);
|
||||||
|
}
|
||||||
|
|
||||||
public Pair<String, Server> getDefaultLookup()
|
public Pair<String, Server> getDefaultLookup()
|
||||||
{
|
{
|
||||||
final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -36,5 +37,27 @@ import org.apache.druid.query.Query;
|
||||||
|
|
||||||
public interface TieredBrokerSelectorStrategy
|
public interface TieredBrokerSelectorStrategy
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to determine the name of the Broker service to which the given native
|
||||||
|
* query should be routed.
|
||||||
|
*
|
||||||
|
* @param config Config containing tier to broker service map
|
||||||
|
* @param query Native (JSON) query to be routed
|
||||||
|
* @return An empty Optional if the service name could not be determined.
|
||||||
|
*/
|
||||||
Optional<String> getBrokerServiceName(TieredBrokerConfig config, Query query);
|
Optional<String> getBrokerServiceName(TieredBrokerConfig config, Query query);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to determine the name of the Broker service to which the given SqlQuery
|
||||||
|
* should be routed. The default implementation returns an empty Optional.
|
||||||
|
*
|
||||||
|
* @param config Config containing tier to broker service map
|
||||||
|
* @param sqlQuery SQL query to be routed
|
||||||
|
* @return An empty Optional if the service name could not be determined.
|
||||||
|
*/
|
||||||
|
default Optional<String> getBrokerServiceName(TieredBrokerConfig config, SqlQuery sqlQuery)
|
||||||
|
{
|
||||||
|
return Optional.absent();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -64,6 +64,8 @@ import org.apache.druid.server.security.AllowAllAuthorizer;
|
||||||
import org.apache.druid.server.security.AuthenticatorMapper;
|
import org.apache.druid.server.security.AuthenticatorMapper;
|
||||||
import org.apache.druid.server.security.Authorizer;
|
import org.apache.druid.server.security.Authorizer;
|
||||||
import org.apache.druid.server.security.AuthorizerMapper;
|
import org.apache.druid.server.security.AuthorizerMapper;
|
||||||
|
import org.apache.druid.sql.http.ResultFormat;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.eclipse.jetty.client.HttpClient;
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
import org.eclipse.jetty.server.Handler;
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
@ -89,6 +91,7 @@ import java.net.URL;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.zip.Deflater;
|
import java.util.zip.Deflater;
|
||||||
|
@ -190,10 +193,23 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
latch.await();
|
latch.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSqlQueryProxy() throws Exception
|
||||||
|
{
|
||||||
|
final SqlQuery query = new SqlQuery("SELECT * FROM foo", ResultFormat.ARRAY, false, null, null);
|
||||||
|
final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
|
||||||
|
EasyMock.expect(hostFinder.findServerSql(query))
|
||||||
|
.andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
|
||||||
|
EasyMock.replay(hostFinder);
|
||||||
|
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.setProperty("druid.router.sql.enable", "true");
|
||||||
|
verifyServletCallsForQuery(query, true, hostFinder, properties);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueryProxy() throws Exception
|
public void testQueryProxy() throws Exception
|
||||||
{
|
{
|
||||||
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
|
|
||||||
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
.dataSource("foo")
|
.dataSource("foo")
|
||||||
.intervals("2000/P1D")
|
.intervals("2000/P1D")
|
||||||
|
@ -205,6 +221,20 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
|
EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
|
||||||
EasyMock.replay(hostFinder);
|
EasyMock.replay(hostFinder);
|
||||||
|
|
||||||
|
verifyServletCallsForQuery(query, false, hostFinder, new Properties());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that the Servlet calls the right methods the right number of times.
|
||||||
|
*/
|
||||||
|
private void verifyServletCallsForQuery(
|
||||||
|
Object query,
|
||||||
|
boolean isSql,
|
||||||
|
QueryHostFinder hostFinder,
|
||||||
|
Properties properties
|
||||||
|
) throws Exception
|
||||||
|
{
|
||||||
|
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
|
||||||
final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
|
final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
|
||||||
final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
|
final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
|
||||||
final ServletInputStream servletInputStream = new ServletInputStream()
|
final ServletInputStream servletInputStream = new ServletInputStream()
|
||||||
|
@ -242,10 +272,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2);
|
EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2);
|
||||||
requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper);
|
requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
EasyMock.expect(requestMock.getRequestURI()).andReturn("/druid/v2/");
|
EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? "/druid/v2/sql" : "/druid/v2/");
|
||||||
EasyMock.expect(requestMock.getMethod()).andReturn("POST");
|
EasyMock.expect(requestMock.getMethod()).andReturn("POST");
|
||||||
EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
|
EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
|
||||||
requestMock.setAttribute("org.apache.druid.proxy.query", query);
|
requestMock.setAttribute(
|
||||||
|
isSql ? "org.apache.druid.proxy.sqlQuery" : "org.apache.druid.proxy.query",
|
||||||
|
query
|
||||||
|
);
|
||||||
requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999");
|
requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999");
|
||||||
requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
|
requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
@ -262,7 +295,8 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
new NoopServiceEmitter(),
|
new NoopServiceEmitter(),
|
||||||
new NoopRequestLogger(),
|
new NoopRequestLogger(),
|
||||||
new DefaultGenericQueryMetricsFactory(),
|
new DefaultGenericQueryMetricsFactory(),
|
||||||
new AuthenticatorMapper(ImmutableMap.of())
|
new AuthenticatorMapper(ImmutableMap.of()),
|
||||||
|
properties
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -354,7 +388,8 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
new NoopServiceEmitter(),
|
new NoopServiceEmitter(),
|
||||||
new NoopRequestLogger(),
|
new NoopRequestLogger(),
|
||||||
new DefaultGenericQueryMetricsFactory(),
|
new DefaultGenericQueryMetricsFactory(),
|
||||||
new AuthenticatorMapper(ImmutableMap.of())
|
new AuthenticatorMapper(ImmutableMap.of()),
|
||||||
|
new Properties()
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -477,32 +512,32 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
final int maxNumRows = 1000;
|
final int maxNumRows = 1000;
|
||||||
|
|
||||||
final List<? extends Service.Request> avaticaRequests = ImmutableList.of(
|
final List<? extends Service.Request> avaticaRequests = ImmutableList.of(
|
||||||
new Service.CatalogsRequest(connectionId),
|
new Service.CatalogsRequest(connectionId),
|
||||||
new Service.SchemasRequest(connectionId, "druid", null),
|
new Service.SchemasRequest(connectionId, "druid", null),
|
||||||
new Service.TablesRequest(connectionId, "druid", "druid", null, null),
|
new Service.TablesRequest(connectionId, "druid", "druid", null, null),
|
||||||
new Service.ColumnsRequest(connectionId, "druid", "druid", "someTable", null),
|
new Service.ColumnsRequest(connectionId, "druid", "druid", "someTable", null),
|
||||||
new Service.PrepareAndExecuteRequest(
|
new Service.PrepareAndExecuteRequest(
|
||||||
connectionId,
|
connectionId,
|
||||||
statementId,
|
statementId,
|
||||||
query,
|
query,
|
||||||
maxNumRows
|
maxNumRows
|
||||||
),
|
),
|
||||||
new Service.PrepareRequest(connectionId, query, maxNumRows),
|
new Service.PrepareRequest(connectionId, query, maxNumRows),
|
||||||
new Service.ExecuteRequest(
|
new Service.ExecuteRequest(
|
||||||
new Meta.StatementHandle(connectionId, statementId, null),
|
new Meta.StatementHandle(connectionId, statementId, null),
|
||||||
ImmutableList.of(),
|
ImmutableList.of(),
|
||||||
maxNumRows
|
maxNumRows
|
||||||
),
|
),
|
||||||
new Service.CloseStatementRequest(connectionId, statementId),
|
new Service.CloseStatementRequest(connectionId, statementId),
|
||||||
new Service.CloseConnectionRequest(connectionId)
|
new Service.CloseConnectionRequest(connectionId)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
for (Service.Request request : avaticaRequests) {
|
for (Service.Request request : avaticaRequests) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"failed",
|
"failed",
|
||||||
connectionId,
|
connectionId,
|
||||||
AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request)
|
AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -28,11 +28,13 @@ import org.apache.druid.query.Druids;
|
||||||
import org.apache.druid.query.QueryContexts;
|
import org.apache.druid.query.QueryContexts;
|
||||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
|
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -200,6 +202,56 @@ public class ManualTieredBrokerSelectorStrategyTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBrokerServiceName_forSql()
|
||||||
|
{
|
||||||
|
final ManualTieredBrokerSelectorStrategy strategy =
|
||||||
|
new ManualTieredBrokerSelectorStrategy(null);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
Optional.absent(),
|
||||||
|
strategy.getBrokerServiceName(tieredBrokerConfig, createSqlQueryWithContext(null))
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.absent(),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
createSqlQueryWithContext(
|
||||||
|
ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_HOT),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
createSqlQueryWithContext(
|
||||||
|
ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(Names.BROKER_SVC_COLD),
|
||||||
|
strategy.getBrokerServiceName(
|
||||||
|
tieredBrokerConfig,
|
||||||
|
createSqlQueryWithContext(
|
||||||
|
ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_COLD)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SqlQuery createSqlQueryWithContext(Map<String, Object> queryContext)
|
||||||
|
{
|
||||||
|
return new SqlQuery(
|
||||||
|
"SELECT * FROM test",
|
||||||
|
null,
|
||||||
|
false,
|
||||||
|
queryContext,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test constants.
|
* Test constants.
|
||||||
*/
|
*/
|
|
@ -44,6 +44,7 @@ import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
|
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
|
||||||
import org.apache.druid.server.coordinator.rules.Rule;
|
import org.apache.druid.server.coordinator.rules.Rule;
|
||||||
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -56,6 +57,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -333,6 +335,33 @@ public class TieredBrokerHostSelectorTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectForSql()
|
||||||
|
{
|
||||||
|
Assert.assertEquals(
|
||||||
|
brokerSelector.getDefaultServiceName(),
|
||||||
|
brokerSelector.selectForSql(
|
||||||
|
createSqlQueryWithContext(null)
|
||||||
|
).lhs
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"hotBroker",
|
||||||
|
brokerSelector.selectForSql(
|
||||||
|
createSqlQueryWithContext(
|
||||||
|
ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "hotBroker")
|
||||||
|
)
|
||||||
|
).lhs
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"coldBroker",
|
||||||
|
brokerSelector.selectForSql(
|
||||||
|
createSqlQueryWithContext(
|
||||||
|
ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "coldBroker")
|
||||||
|
)
|
||||||
|
).lhs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAllBrokers()
|
public void testGetAllBrokers()
|
||||||
{
|
{
|
||||||
|
@ -356,6 +385,17 @@ public class TieredBrokerHostSelectorTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SqlQuery createSqlQueryWithContext(Map<String, Object> queryContext)
|
||||||
|
{
|
||||||
|
return new SqlQuery(
|
||||||
|
"SELECT * FROM test",
|
||||||
|
null,
|
||||||
|
false,
|
||||||
|
queryContext,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestRuleManager extends CoordinatorRuleManager
|
private static class TestRuleManager extends CoordinatorRuleManager
|
||||||
{
|
{
|
||||||
public TestRuleManager(
|
public TestRuleManager(
|
Loading…
Reference in New Issue