mirror of https://github.com/apache/druid.git
Add Calcite Avatica protobuf handler (#10543)
This commit is contained in:
parent
43ea184b74
commit
782a1d4e6c
|
@ -38,6 +38,12 @@
|
||||||
</Or>
|
</Or>
|
||||||
</And>
|
</And>
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<And>
|
||||||
|
<Bug pattern="SE_BAD_FIELD_STORE" />
|
||||||
|
<Class name="org.apache.druid.server.AsyncQueryForwardingServlet" />
|
||||||
|
</And>
|
||||||
|
</Match>
|
||||||
|
|
||||||
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
|
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
|
||||||
<Bug pattern="BC_UNCONFIRMED_CAST"/>
|
<Bug pattern="BC_UNCONFIRMED_CAST"/>
|
||||||
|
|
|
@ -954,6 +954,14 @@ try (Connection connection = DriverManager.getConnection(url, connectionProperti
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
It is also possible to use a protocol buffers JDBC connection with Druid, this offer reduced bloat and potential performance
|
||||||
|
improvements for larger result sets. To use it apply the following connection url instead, everything else remains the same
|
||||||
|
```
|
||||||
|
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica-protobuf/;serialization=protobuf";
|
||||||
|
```
|
||||||
|
|
||||||
|
> The protobuf endpoint is also known to work with the official [Golang Avatica driver](https://github.com/apache/calcite-avatica-go)
|
||||||
|
|
||||||
Table metadata is available over JDBC using `connection.getMetaData()` or by querying the
|
Table metadata is available over JDBC using `connection.getMetaData()` or by querying the
|
||||||
["INFORMATION_SCHEMA" tables](#metadata-tables).
|
["INFORMATION_SCHEMA" tables](#metadata-tables).
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.druid.java.util.http.client.CredentialedHttpClient;
|
||||||
import org.apache.druid.java.util.http.client.HttpClient;
|
import org.apache.druid.java.util.http.client.HttpClient;
|
||||||
import org.apache.druid.java.util.http.client.auth.BasicCredentials;
|
import org.apache.druid.java.util.http.client.auth.BasicCredentials;
|
||||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||||
import org.apache.druid.sql.avatica.DruidAvaticaHandler;
|
import org.apache.druid.sql.avatica.DruidAvaticaJsonHandler;
|
||||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||||
import org.apache.druid.testing.utils.HttpUtil;
|
import org.apache.druid.testing.utils.HttpUtil;
|
||||||
|
@ -285,12 +285,12 @@ public abstract class AbstractAuthConfigurationTest
|
||||||
|
|
||||||
String getBrokerAvacticaUrl()
|
String getBrokerAvacticaUrl()
|
||||||
{
|
{
|
||||||
return "jdbc:avatica:remote:url=" + config.getBrokerUrl() + DruidAvaticaHandler.AVATICA_PATH;
|
return "jdbc:avatica:remote:url=" + config.getBrokerUrl() + DruidAvaticaJsonHandler.AVATICA_PATH;
|
||||||
}
|
}
|
||||||
|
|
||||||
String getRouterAvacticaUrl()
|
String getRouterAvacticaUrl()
|
||||||
{
|
{
|
||||||
return "jdbc:avatica:remote:url=" + config.getRouterUrl() + DruidAvaticaHandler.AVATICA_PATH;
|
return "jdbc:avatica:remote:url=" + config.getRouterUrl() + DruidAvaticaJsonHandler.AVATICA_PATH;
|
||||||
}
|
}
|
||||||
|
|
||||||
void verifyAdminOptionsRequest()
|
void verifyAdminOptionsRequest()
|
||||||
|
|
|
@ -319,6 +319,10 @@
|
||||||
<groupId>com.fasterxml.jackson.module</groupId>
|
<groupId>com.fasterxml.jackson.module</groupId>
|
||||||
<artifactId>jackson-module-guice</artifactId>
|
<artifactId>jackson-module-guice</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.calcite.avatica</groupId>
|
||||||
|
<artifactId>avatica-core</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -450,11 +454,6 @@
|
||||||
<version>1.3</version>
|
<version>1.3</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.calcite.avatica</groupId>
|
|
||||||
<artifactId>avatica-core</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -26,6 +26,10 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
|
import org.apache.calcite.avatica.remote.ProtobufTranslation;
|
||||||
|
import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
|
||||||
|
import org.apache.calcite.avatica.remote.Service;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.druid.client.selector.Server;
|
import org.apache.druid.client.selector.Server;
|
||||||
import org.apache.druid.guice.annotations.Json;
|
import org.apache.druid.guice.annotations.Json;
|
||||||
import org.apache.druid.guice.annotations.Smile;
|
import org.apache.druid.guice.annotations.Smile;
|
||||||
|
@ -118,6 +122,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
private final RequestLogger requestLogger;
|
private final RequestLogger requestLogger;
|
||||||
private final GenericQueryMetricsFactory queryMetricsFactory;
|
private final GenericQueryMetricsFactory queryMetricsFactory;
|
||||||
private final AuthenticatorMapper authenticatorMapper;
|
private final AuthenticatorMapper authenticatorMapper;
|
||||||
|
private final ProtobufTranslation protobufTranslation;
|
||||||
|
|
||||||
private HttpClient broadcastClient;
|
private HttpClient broadcastClient;
|
||||||
|
|
||||||
|
@ -145,6 +150,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
this.requestLogger = requestLogger;
|
this.requestLogger = requestLogger;
|
||||||
this.queryMetricsFactory = queryMetricsFactory;
|
this.queryMetricsFactory = queryMetricsFactory;
|
||||||
this.authenticatorMapper = authenticatorMapper;
|
this.authenticatorMapper = authenticatorMapper;
|
||||||
|
this.protobufTranslation = new ProtobufTranslationImpl();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -191,9 +197,16 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
// them as a generic request.
|
// them as a generic request.
|
||||||
final boolean isQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql");
|
final boolean isQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql");
|
||||||
|
|
||||||
final boolean isAvatica = requestURI.startsWith("/druid/v2/sql/avatica");
|
final boolean isAvaticaJson = requestURI.startsWith("/druid/v2/sql/avatica");
|
||||||
|
final boolean isAvaticaPb = requestURI.startsWith("/druid/v2/sql/avatica-protobuf");
|
||||||
|
|
||||||
if (isAvatica) {
|
if (isAvaticaPb) {
|
||||||
|
byte[] requestBytes = IOUtils.toByteArray(request.getInputStream());
|
||||||
|
Service.Request protobufRequest = this.protobufTranslation.parseRequest(requestBytes);
|
||||||
|
String connectionId = getAvaticaProtobufConnectionId(protobufRequest);
|
||||||
|
targetServer = hostFinder.findServerAvatica(connectionId);
|
||||||
|
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
||||||
|
} else if (isAvaticaJson) {
|
||||||
Map<String, Object> requestMap = objectMapper.readValue(
|
Map<String, Object> requestMap = objectMapper.readValue(
|
||||||
request.getInputStream(),
|
request.getInputStream(),
|
||||||
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||||
|
@ -456,6 +469,95 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
return (String) connectionIdObj;
|
return (String) connectionIdObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static String getAvaticaProtobufConnectionId(Service.Request request)
|
||||||
|
{
|
||||||
|
if (request instanceof Service.CatalogsRequest) {
|
||||||
|
return ((Service.CatalogsRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.SchemasRequest) {
|
||||||
|
return ((Service.SchemasRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.TablesRequest) {
|
||||||
|
return ((Service.TablesRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.TypeInfoRequest) {
|
||||||
|
return ((Service.TypeInfoRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.ColumnsRequest) {
|
||||||
|
return ((Service.ColumnsRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.ExecuteRequest) {
|
||||||
|
return ((Service.ExecuteRequest) request).statementHandle.connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.TableTypesRequest) {
|
||||||
|
return ((Service.TableTypesRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.PrepareRequest) {
|
||||||
|
return ((Service.PrepareRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.PrepareAndExecuteRequest) {
|
||||||
|
return ((Service.PrepareAndExecuteRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.FetchRequest) {
|
||||||
|
return ((Service.FetchRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.CreateStatementRequest) {
|
||||||
|
return ((Service.CreateStatementRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.CloseStatementRequest) {
|
||||||
|
return ((Service.CloseStatementRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.OpenConnectionRequest) {
|
||||||
|
return ((Service.OpenConnectionRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.CloseConnectionRequest) {
|
||||||
|
return ((Service.CloseConnectionRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.ConnectionSyncRequest) {
|
||||||
|
return ((Service.ConnectionSyncRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.DatabasePropertyRequest) {
|
||||||
|
return ((Service.DatabasePropertyRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.SyncResultsRequest) {
|
||||||
|
return ((Service.SyncResultsRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.CommitRequest) {
|
||||||
|
return ((Service.CommitRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.RollbackRequest) {
|
||||||
|
return ((Service.RollbackRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.PrepareAndExecuteBatchRequest) {
|
||||||
|
return ((Service.PrepareAndExecuteBatchRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request instanceof Service.ExecuteBatchRequest) {
|
||||||
|
return ((Service.ExecuteBatchRequest) request).connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IAE("Received an unknown Avatica protobuf request");
|
||||||
|
}
|
||||||
|
|
||||||
private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener
|
private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener
|
||||||
{
|
{
|
||||||
private final HttpServletRequest req;
|
private final HttpServletRequest req;
|
||||||
|
|
|
@ -468,6 +468,45 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAvaticaProtobufConnectionId()
|
||||||
|
{
|
||||||
|
final String query = "SELECT someColumn FROM druid.someTable WHERE someColumn IS NOT NULL";
|
||||||
|
final String connectionId = "000000-0000-0000-00000000";
|
||||||
|
final int statementId = 1337;
|
||||||
|
final int maxNumRows = 1000;
|
||||||
|
|
||||||
|
final List<? extends Service.Request> avaticaRequests = ImmutableList.of(
|
||||||
|
new Service.CatalogsRequest(connectionId),
|
||||||
|
new Service.SchemasRequest(connectionId, "druid", null),
|
||||||
|
new Service.TablesRequest(connectionId, "druid", "druid", null, null),
|
||||||
|
new Service.ColumnsRequest(connectionId, "druid", "druid", "someTable", null),
|
||||||
|
new Service.PrepareAndExecuteRequest(
|
||||||
|
connectionId,
|
||||||
|
statementId,
|
||||||
|
query,
|
||||||
|
maxNumRows
|
||||||
|
),
|
||||||
|
new Service.PrepareRequest(connectionId, query, maxNumRows),
|
||||||
|
new Service.ExecuteRequest(
|
||||||
|
new Meta.StatementHandle(connectionId, statementId, null),
|
||||||
|
ImmutableList.of(),
|
||||||
|
maxNumRows
|
||||||
|
),
|
||||||
|
new Service.CloseStatementRequest(connectionId, statementId),
|
||||||
|
new Service.CloseConnectionRequest(connectionId)
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
for (Service.Request request : avaticaRequests) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
"failed",
|
||||||
|
connectionId,
|
||||||
|
AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static Map<String, Object> asMap(String json, ObjectMapper mapper) throws JsonProcessingException
|
private static Map<String, Object> asMap(String json, ObjectMapper mapper) throws JsonProcessingException
|
||||||
{
|
{
|
||||||
return mapper.readValue(json, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
|
return mapper.readValue(json, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
|
||||||
|
|
|
@ -39,7 +39,8 @@ import org.apache.druid.server.security.AuthConfig;
|
||||||
import org.apache.druid.server.security.AuthenticationUtils;
|
import org.apache.druid.server.security.AuthenticationUtils;
|
||||||
import org.apache.druid.server.security.Authenticator;
|
import org.apache.druid.server.security.Authenticator;
|
||||||
import org.apache.druid.server.security.AuthenticatorMapper;
|
import org.apache.druid.server.security.AuthenticatorMapper;
|
||||||
import org.apache.druid.sql.avatica.DruidAvaticaHandler;
|
import org.apache.druid.sql.avatica.DruidAvaticaJsonHandler;
|
||||||
|
import org.apache.druid.sql.avatica.DruidAvaticaProtobufHandler;
|
||||||
import org.eclipse.jetty.server.Handler;
|
import org.eclipse.jetty.server.Handler;
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
import org.eclipse.jetty.server.handler.HandlerList;
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
@ -57,7 +58,8 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
||||||
// JDBC authentication uses the JDBC connection context instead of HTTP headers, skip the normal auth checks.
|
// JDBC authentication uses the JDBC connection context instead of HTTP headers, skip the normal auth checks.
|
||||||
// The router will keep the connection context in the forwarded message, and the broker is responsible for
|
// The router will keep the connection context in the forwarded message, and the broker is responsible for
|
||||||
// performing the auth checks.
|
// performing the auth checks.
|
||||||
DruidAvaticaHandler.AVATICA_PATH
|
DruidAvaticaJsonHandler.AVATICA_PATH,
|
||||||
|
DruidAvaticaProtobufHandler.AVATICA_PATH
|
||||||
);
|
);
|
||||||
|
|
||||||
private final DruidHttpClientConfig routerHttpClientConfig;
|
private final DruidHttpClientConfig routerHttpClientConfig;
|
||||||
|
|
|
@ -36,7 +36,8 @@ public class AvaticaModule implements Module
|
||||||
{
|
{
|
||||||
JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class);
|
JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class);
|
||||||
binder.bind(AvaticaMonitor.class).in(LazySingleton.class);
|
binder.bind(AvaticaMonitor.class).in(LazySingleton.class);
|
||||||
JettyBindings.addHandler(binder, DruidAvaticaHandler.class);
|
JettyBindings.addHandler(binder, DruidAvaticaJsonHandler.class);
|
||||||
|
JettyBindings.addHandler(binder, DruidAvaticaProtobufHandler.class);
|
||||||
MetricsModule.register(binder, AvaticaMonitor.class);
|
MetricsModule.register(binder, AvaticaMonitor.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,12 +32,12 @@ import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
public class DruidAvaticaHandler extends AvaticaJsonHandler
|
public class DruidAvaticaJsonHandler extends AvaticaJsonHandler
|
||||||
{
|
{
|
||||||
public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";
|
public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DruidAvaticaHandler(
|
public DruidAvaticaJsonHandler(
|
||||||
final DruidMeta druidMeta,
|
final DruidMeta druidMeta,
|
||||||
@Self final DruidNode druidNode,
|
@Self final DruidNode druidNode,
|
||||||
final AvaticaMonitor avaticaMonitor
|
final AvaticaMonitor avaticaMonitor
|
|
@ -0,0 +1,62 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.sql.avatica;
|
||||||
|
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.calcite.avatica.remote.LocalService;
|
||||||
|
import org.apache.calcite.avatica.remote.Service;
|
||||||
|
import org.apache.calcite.avatica.server.AvaticaProtobufHandler;
|
||||||
|
import org.apache.druid.guice.annotations.Self;
|
||||||
|
import org.apache.druid.server.DruidNode;
|
||||||
|
import org.eclipse.jetty.server.Request;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler
|
||||||
|
{
|
||||||
|
public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/";
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public DruidAvaticaProtobufHandler(
|
||||||
|
final DruidMeta druidMeta,
|
||||||
|
@Self final DruidNode druidNode,
|
||||||
|
final AvaticaMonitor avaticaMonitor
|
||||||
|
)
|
||||||
|
{
|
||||||
|
super(new LocalService(druidMeta), avaticaMonitor);
|
||||||
|
setServerRpcMetadata(new Service.RpcMetadataResponse(druidNode.getHostAndPortToUse()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(
|
||||||
|
final String target,
|
||||||
|
final Request baseRequest,
|
||||||
|
final HttpServletRequest request,
|
||||||
|
final HttpServletResponse response
|
||||||
|
) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
if (request.getRequestURI().equals(AVATICA_PATH)) {
|
||||||
|
super.handle(target, baseRequest, request, response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -104,8 +104,10 @@ public class DruidMeta extends MetaImpl
|
||||||
{
|
{
|
||||||
// Build connection context.
|
// Build connection context.
|
||||||
final ImmutableMap.Builder<String, Object> context = ImmutableMap.builder();
|
final ImmutableMap.Builder<String, Object> context = ImmutableMap.builder();
|
||||||
for (Map.Entry<String, String> entry : info.entrySet()) {
|
if (info != null) {
|
||||||
context.put(entry);
|
for (Map.Entry<String, String> entry : info.entrySet()) {
|
||||||
|
context.put(entry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
openDruidConnection(ch.id, context.build());
|
openDruidConnection(ch.id, context.build());
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,19 +176,36 @@ public class AvaticaModuleTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDruidAvaticaHandlerIsInjected()
|
public void testDruidAvaticaJsonHandlerIsInjected()
|
||||||
{
|
{
|
||||||
DruidAvaticaHandler handler = injector.getInstance(DruidAvaticaHandler.class);
|
DruidAvaticaJsonHandler handler = injector.getInstance(DruidAvaticaJsonHandler.class);
|
||||||
Assert.assertNotNull(handler);
|
Assert.assertNotNull(handler);
|
||||||
DruidAvaticaHandler other = injector.getInstance(DruidAvaticaHandler.class);
|
DruidAvaticaJsonHandler other = injector.getInstance(DruidAvaticaJsonHandler.class);
|
||||||
Assert.assertNotSame(handler, other);
|
Assert.assertNotSame(handler, other);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDruidAvaticaHandlerIsRegisterdWithJerseyModule()
|
public void testDruidAvaticaProtobufHandlerIsInjected()
|
||||||
|
{
|
||||||
|
DruidAvaticaProtobufHandler handler = injector.getInstance(DruidAvaticaProtobufHandler.class);
|
||||||
|
Assert.assertNotNull(handler);
|
||||||
|
DruidAvaticaProtobufHandler other = injector.getInstance(DruidAvaticaProtobufHandler.class);
|
||||||
|
Assert.assertNotSame(handler, other);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDruidAvaticaJsonHandlerIsRegisterdWithJerseyModule()
|
||||||
{
|
{
|
||||||
Set<Handler> handlers =
|
Set<Handler> handlers =
|
||||||
injector.getInstance(Key.get(new TypeLiteral<Set<Handler>>(){}));
|
injector.getInstance(Key.get(new TypeLiteral<Set<Handler>>(){}));
|
||||||
Assert.assertTrue(handlers.stream().anyMatch(h -> DruidAvaticaHandler.class.equals(h.getClass())));
|
Assert.assertTrue(handlers.stream().anyMatch(h -> DruidAvaticaJsonHandler.class.equals(h.getClass())));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDruidAvaticaProtobufHandlerIsRegisterdWithJerseyModule()
|
||||||
|
{
|
||||||
|
Set<Handler> handlers =
|
||||||
|
injector.getInstance(Key.get(new TypeLiteral<Set<Handler>>(){}));
|
||||||
|
Assert.assertTrue(handlers.stream().anyMatch(h -> DruidAvaticaProtobufHandler.class.equals(h.getClass())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.calcite.avatica.AvaticaClientRuntimeException;
|
||||||
import org.apache.calcite.avatica.Meta;
|
import org.apache.calcite.avatica.Meta;
|
||||||
import org.apache.calcite.avatica.MissingResultsException;
|
import org.apache.calcite.avatica.MissingResultsException;
|
||||||
import org.apache.calcite.avatica.NoSuchStatementException;
|
import org.apache.calcite.avatica.NoSuchStatementException;
|
||||||
|
import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
|
||||||
import org.apache.calcite.schema.SchemaPlus;
|
import org.apache.calcite.schema.SchemaPlus;
|
||||||
import org.apache.druid.common.config.NullHandling;
|
import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.guice.GuiceInjectors;
|
import org.apache.druid.guice.GuiceInjectors;
|
||||||
|
@ -46,7 +47,6 @@ import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
import org.apache.druid.math.expr.ExprMacroTable;
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import org.apache.druid.server.DruidNode;
|
|
||||||
import org.apache.druid.server.QueryLifecycleFactory;
|
import org.apache.druid.server.QueryLifecycleFactory;
|
||||||
import org.apache.druid.server.QueryStackTests;
|
import org.apache.druid.server.QueryStackTests;
|
||||||
import org.apache.druid.server.RequestLogLine;
|
import org.apache.druid.server.RequestLogLine;
|
||||||
|
@ -101,7 +101,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
public class DruidAvaticaHandlerTest extends CalciteTestBase
|
public abstract class DruidAvaticaHandlerTest extends CalciteTestBase
|
||||||
{
|
{
|
||||||
private static final AvaticaServerConfig AVATICA_CONFIG = new AvaticaServerConfig()
|
private static final AvaticaServerConfig AVATICA_CONFIG = new AvaticaServerConfig()
|
||||||
{
|
{
|
||||||
|
@ -200,20 +200,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
||||||
);
|
);
|
||||||
|
|
||||||
druidMeta = injector.getInstance(DruidMeta.class);
|
druidMeta = injector.getInstance(DruidMeta.class);
|
||||||
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
|
final AbstractAvaticaHandler handler = this.getAvaticaHandler(druidMeta);
|
||||||
druidMeta,
|
|
||||||
new DruidNode("dummy", "dummy", false, 1, null, true, false),
|
|
||||||
new AvaticaMonitor()
|
|
||||||
);
|
|
||||||
final int port = ThreadLocalRandom.current().nextInt(9999) + 10000;
|
final int port = ThreadLocalRandom.current().nextInt(9999) + 10000;
|
||||||
server = new Server(new InetSocketAddress("127.0.0.1", port));
|
server = new Server(new InetSocketAddress("127.0.0.1", port));
|
||||||
server.setHandler(handler);
|
server.setHandler(handler);
|
||||||
server.start();
|
server.start();
|
||||||
url = StringUtils.format(
|
url = this.getJdbcConnectionString(port);
|
||||||
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
|
|
||||||
port,
|
|
||||||
DruidAvaticaHandler.AVATICA_PATH
|
|
||||||
);
|
|
||||||
client = DriverManager.getConnection(url, "regularUser", "druid");
|
client = DriverManager.getConnection(url, "regularUser", "druid");
|
||||||
superuserClient = DriverManager.getConnection(url, CalciteTests.TEST_SUPERUSER_NAME, "druid");
|
superuserClient = DriverManager.getConnection(url, CalciteTests.TEST_SUPERUSER_NAME, "druid");
|
||||||
|
|
||||||
|
@ -886,20 +878,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
|
final AbstractAvaticaHandler handler = this.getAvaticaHandler(smallFrameDruidMeta);
|
||||||
smallFrameDruidMeta,
|
|
||||||
new DruidNode("dummy", "dummy", false, 1, null, true, false),
|
|
||||||
new AvaticaMonitor()
|
|
||||||
);
|
|
||||||
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
|
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
|
||||||
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
|
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
|
||||||
smallFrameServer.setHandler(handler);
|
smallFrameServer.setHandler(handler);
|
||||||
smallFrameServer.start();
|
smallFrameServer.start();
|
||||||
String smallFrameUrl = StringUtils.format(
|
String smallFrameUrl = this.getJdbcConnectionString(port);
|
||||||
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
|
|
||||||
port,
|
|
||||||
DruidAvaticaHandler.AVATICA_PATH
|
|
||||||
);
|
|
||||||
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
|
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
|
||||||
|
|
||||||
final ResultSet resultSet = smallFrameClient.createStatement().executeQuery(
|
final ResultSet resultSet = smallFrameClient.createStatement().executeQuery(
|
||||||
|
@ -984,20 +968,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
|
final AbstractAvaticaHandler handler = this.getAvaticaHandler(smallFrameDruidMeta);
|
||||||
smallFrameDruidMeta,
|
|
||||||
new DruidNode("dummy", "dummy", false, 1, null, true, false),
|
|
||||||
new AvaticaMonitor()
|
|
||||||
);
|
|
||||||
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
|
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
|
||||||
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
|
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
|
||||||
smallFrameServer.setHandler(handler);
|
smallFrameServer.setHandler(handler);
|
||||||
smallFrameServer.start();
|
smallFrameServer.start();
|
||||||
String smallFrameUrl = StringUtils.format(
|
String smallFrameUrl = this.getJdbcConnectionString(port);
|
||||||
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
|
|
||||||
port,
|
|
||||||
DruidAvaticaHandler.AVATICA_PATH
|
|
||||||
);
|
|
||||||
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
|
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
|
||||||
|
|
||||||
// use a prepared statement because avatica currently ignores fetchSize on the initial fetch of a Statement
|
// use a prepared statement because avatica currently ignores fetchSize on the initial fetch of a Statement
|
||||||
|
@ -1328,6 +1304,10 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract String getJdbcConnectionString(int port);
|
||||||
|
|
||||||
|
protected abstract AbstractAvaticaHandler getAvaticaHandler(DruidMeta druidMeta);
|
||||||
|
|
||||||
private static List<Map<String, Object>> getRows(final ResultSet resultSet) throws SQLException
|
private static List<Map<String, Object>> getRows(final ResultSet resultSet) throws SQLException
|
||||||
{
|
{
|
||||||
return getRows(resultSet, null);
|
return getRows(resultSet, null);
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.sql.avatica;
|
||||||
|
|
||||||
|
import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
import org.apache.druid.server.DruidNode;
|
||||||
|
|
||||||
|
public class DruidAvaticaJsonHandlerTest extends DruidAvaticaHandlerTest
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected String getJdbcConnectionString(final int port)
|
||||||
|
{
|
||||||
|
return StringUtils.format(
|
||||||
|
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
|
||||||
|
port,
|
||||||
|
DruidAvaticaJsonHandler.AVATICA_PATH
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
|
||||||
|
{
|
||||||
|
return new DruidAvaticaJsonHandler(
|
||||||
|
druidMeta,
|
||||||
|
new DruidNode("dummy", "dummy", false, 1, null, true, false),
|
||||||
|
new AvaticaMonitor()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.sql.avatica;
|
||||||
|
|
||||||
|
import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
import org.apache.druid.server.DruidNode;
|
||||||
|
|
||||||
|
public class DruidAvaticaProtobufHandlerTest extends DruidAvaticaHandlerTest
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected String getJdbcConnectionString(final int port)
|
||||||
|
{
|
||||||
|
return StringUtils.format(
|
||||||
|
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s;serialization=protobuf",
|
||||||
|
port,
|
||||||
|
DruidAvaticaProtobufHandler.AVATICA_PATH
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
|
||||||
|
{
|
||||||
|
return new DruidAvaticaProtobufHandler(
|
||||||
|
druidMeta,
|
||||||
|
new DruidNode("dummy", "dummy", false, 1, null, true, false),
|
||||||
|
new AvaticaMonitor()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1860,3 +1860,5 @@ MiB
|
||||||
GiB
|
GiB
|
||||||
TiB
|
TiB
|
||||||
PiB
|
PiB
|
||||||
|
protobuf
|
||||||
|
Golang
|
||||||
|
|
Loading…
Reference in New Issue