diff --git a/distribution/pom.xml b/distribution/pom.xml index f8b2999f7ba..5aa71130f77 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -458,6 +458,8 @@ org.apache.druid.extensions.contrib:druid-spectator-histogram -c org.apache.druid.extensions.contrib:druid-rabbit-indexing-service + -c + org.apache.druid.extensions.contrib:grpc-query diff --git a/extensions-contrib/grpc-query/README.md b/extensions-contrib/grpc-query/README.md new file mode 100644 index 00000000000..1edefbf350c --- /dev/null +++ b/extensions-contrib/grpc-query/README.md @@ -0,0 +1,312 @@ + + +# gRPC Query Extension for Druid + +This extension provides a gRPC API for SQL and Native queries. + +Druid uses REST as its RPC protocol. Druid has a large variety of REST operations +including query, ingest jobs, monitoring, configuration and many more. Although +REST is a universally supported RPC format, is not the only one in use. This +extension allows gRPC-based clients to issue SQL queries. + +Druid is optimized for high-concurrency, low-complexity queries that return a +small result set (a few thousand rows at most). The small-query focus allows +Druid to offer a simple, stateless request/response REST API. This gRPC API +follows that Druid pattern: it is optimized for simple queries and follows +Druid's request/response model. APIs such as JDBC can handle larger results +because they are stateful: a client can request pages of results using multiple +API calls. This API does not support paging: the entire result set is returned +in the response, resulting in an API which is fast for small queries, and not +suitable for larger result sets. + +## Use Cases + +The gRPC query extension can be used in two ways, depending on the selected +result format. + +### CSV or JSON Response Format + +The simplest way to use the gRPC extension is to send a query request that +uses CSV or JSON as the return format. The client simply pulls the results +from the response and does something useful with them. For the CSV format, +headers can be created from the column metadata in the response message. + +### Protobuf Response Format + +Some applications want to use Protobuf as the result format. In this case, +the extension encodes Protobuf-encoded rows as the binary payload of the query +response. This works for an application which uses a fixed set of queries, each +of which is carefully designed to power one application, say a dashboard. The +(simplified) message flow is: + +```text ++-----------+ query -> +-------+ +| Dashboard | -- gRPC --> | Druid | ++-----------+ <- data +-------+ +``` + +In practice, there may be multiple proxy layers: one on the application side, and +the Router on the Druid side. + +The dashboard displays a fixed set of reports and charts. Each of those sends a +well-defined query specified as part of the application. The returned data is thus +both well-known and fixed for each query. The set of queries is fixed by the contents +of the dashboard. That is, this is not an ad-hoc query use case. + +Because the queries are locked down, and are part of the application, the set of valid +result sets is also well known and locked down. Given this well-controlled use case, it +is possible to use a pre-defined Protobuf message to represent the results of each distinct +query. (Protobuf is a compiled format: the solution works only because the set of messages +are well known. It would not work for the ad-hoc case in which each query has a different +result set schema.) + +To be very clear: the application has a fixed set of queries to be sent to Druid via gRPC. +For each query, there is a fixed Protobuf response format defined by the application. +No other queries, aside from this well-known set, will be sent to the gRPC endpoint using +the Protobuf response format. If the set of queries is not well-defined, use the CSV +or JSON response format instead. + +## Installation + +The gRPC query extension is a "contrib" extension and is not installed by default when +you install Druid. Instead, you must install it manually. + +In development, you can build Druid with all the "contrib" extensions. When building +Druid, include the `-P bundle-contrib-exts` in addition to the `-P dist` option: + +```bash +mvn package -Pdist,bundle-contrib-exts ... +``` + +In production, follow the [Druid documentation](https://druid.apache.org/docs/latest/development/extensions.html). + +To enable the extension, add the following to the load list in +`_commmon/common.runtime.properties`: + +```text +druid.extensions.loadList=[..., "grpc-query"] +``` + +Adding the extension to the load list automatically enables the extension, +but only in the Broker. + +If you use the Protobuf response format, bundle up your Protobuf classes +into a jar file, and place that jar file in the +`$DRUID_HOME/extensions/grpc-query` directory. The Protobuf classes will +appear on the class path and will be available from the `grpc-query` +extension. + +### Configuration + +Enable and configure the extension in `broker/runtime.properties`: + +```text +druid.grpcQuery.port=50051 +``` + +The default port is 50051 (preliminary). + +If you use the Protobuf response format, bundle up your Protobuf classes +into a jar file, and place that jar file in the +`$DRUID_HOME/extensions/grpc-query` directory. The Protobuf classes will +appear on the class path and will be available from the `grpc-query` +extension. + +## Usage + +See the `src/main/proto/query.proto` file in the `grpc-query` project for the request and +response message formats. The request message format closely follows the REST JSON message +format. The response is optimized for gRPC: it contains an error (if the request fails), +or the result schema and result data as a binary payload. You can query the gRPC endpoint +with any gRPC client. + +Although both Druid SQL and Druid itself support a `float` data type, that type is not +usable in a Protobuf response object. Internally Druid converts all `float` values to +`double`. As a result, the Protobuf reponse object supports only the `double` type. +An attempt to use `float` will lead to a runtime error when processing the query. +Use the `double` type instead. + +Sample request, + +``` +QueryRequest.newBuilder() + .setQuery("SELECT * FROM foo") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); +``` + +When using Protobuf response format, bundle up your Protobuf classes +into a jar file, and place that jar file in the +`$DRUID_HOME/extensions/grpc-query` directory. +Specify the response Protobuf message name in the request. + +``` +QueryRequest.newBuilder() + .setQuery("SELECT dim1, dim2, dim3, cnt, m1, m2, unique_dim1, __time AS "date" FROM foo") + .setQueryType(QueryOuterClass.QueryType.SQL) + .setProtobufMessageName(QueryResult.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + +Response message + +message QueryResult { + string dim1 = 1; + string dim2 = 2; + string dim3 = 3; + int64 cnt = 4; + float m1 = 5; + double m2 = 6; + bytes unique_dim1 = 7; + google.protobuf.Timestamp date = 8; +} +``` + +## Security + +The extension supports both "anonymous" and basic authorization. Anonymous is the mode +for an out-of-the-box Druid: no authorization needed. The extension does not yet support +other security extensions: each needs its own specific integration. + +Clients that use basic authentication must include a set of credentials. See +`BasicCredentials` for a typical implementation and `BasicAuthTest` for how to +configure the credentials in the client. + +## Implementation Notes + +This project contains several components: + +* Guice module and associated server initialization code. +* Netty-based gRPC server. +* A "driver" that performs the actual query and generates the results. + +## Debugging + +Debugging of the gRPC extension requires extra care due to the nuances of loading +classes from an extension. + +### Running in a Server + +Druid extensions are designed to run in the Druid server. The gRPC extension is +loaded only in the Druid broker using the contiguration described above. If something +fails during startup, the Broker will crash. Consult the Broker logs to determine +what went wrong. Startup failures are typically due to required jars not being installed +as part of the extension. Check the `pom.xml` file to track down what's missing. + +Failures can also occur when running a query. Such failures will result in a failure +response and should result in a log entry in the Broker log file. Use the log entry +to sort out what went wrong. + +You can also attach a debugger to the running process. You'll have to enable the debugger +in the server by adding the required parameters to the Broker's `jvm.config` file. + +### Debugging using Unit Tests + +To debug the functionality of the extension, your best bet is to debug in the context +of a unit test. Druid provides a special test-only SQL stack with a few pre-defined +datasources. See the various `CalciteQueryTest` classes to see what these are. You can +also query Druid's various system tables. See `GrpcQueryTest` for a simple "starter" +unit test that configures the server and uses an in-process client to send requests. + +Most unit testing can be done without the gRPC server, by calling the `QueryDriver` +class directly. That is, if the goal is work with the code that takes a request, runs +a query, and produces a response, then the driver is the key and the server is just a +bit of extra copmlexity. See the `DriverTest` class for an example unit test. + +### Debugging in a Server in an IDE + +We would like to be able to debug the gRPC extension, within the Broker, in an IDE. +As it turns out, doing so breaks Druid's class loader mechanisms in ways that are both +hard to understand and hard to work around. When run in a server, Java creates an instance +of `GrpcQueryModule` using the extension's class loader. Java then uses that same class +loader to load other classes in the extension, including those here and those in the +shaded gRPC jar file. + +However, when run in an IDE, if this project is on the class path, then the `GrpcQueryModule` +class will be loaded from the "App" class loader. This works fine: it causes the other +classes of this module to also be loaded from the class path. However, once execution +calls into gRPC, Java will use the App class loader, not the extension class loader, and +will fail to find some of the classes, resulting in Java exceptions. Worse, in some cases, +Java may load the same class from both class loaders. To Java, these are not the same +classes, and you will get mysterious errors as a result. + +For now, the lesson is: don't try to debug the extension in the Broker in the IDE. Use +one of the above options instead. + +For reference (and in case we figure out a solution to the class loader conflict), +the way to debug the Broker in an IDE is the following: + +* Build your branch. Use the `-P bundle-contrib-exts` flag in place of `-P dist`, as described + above. +* Create an install from the distribution produced above. +* Use the `single-server/micro-quickstart` config for debugging. +* Configure the installation using the steps above. +* Modify the Supervisor config for your config to comment out the line that launches + the broker. Use the hash (`#`) character to comment out the line. +* In your IDE, define a launch configuration for the Broker. + * The launch command is `server broker` + * Add the following JVM arguments: + +```text +--add-exports java.base/jdk.internal.perf=ALL-UNNAMED +--add-exports jdk.management/com.sun.management.internal=ALL-UNNAMED +``` + + * Define `grpc-query` as a project dependency. (This is for Eclipse; IntelliJ may differ.) + * Configure the class path to include the common and Broker properties files. +* Launch the micro-quickstart cluster. +* Launch the Broker in your IDE. + +### gRPC Logging + +Debugging of the gRPC stack is difficult since the shaded jar loses source attachments. + +Logging helps. gRPC logging is not enabled via Druid's logging system. Intead, [create +the following `logging.properties` file](https://stackoverflow.com/questions/50243717/grpc-logger-level): + +```text +handlers=java.util.logging.ConsoleHandler +io.grpc.level=FINE +java.util.logging.ConsoleHandler.level=FINE +java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter +``` + +Then, pass the following on the command line: + +```text +-Djava.util.logging.config.file=logging.properties +``` + +Adjust the path to the file depending on where you put the file. + +## Acknowledgements + +This is not the first project to have created a gRPC API for Druid. Others include: + +* [[Proposal] define a RPC protocol for querying data, support apache Arrow as data + exchange interface](https://github.com/apache/druid/issues/3891) +* [gRPC Druid extension PoC](https://github.com/ndolgov/gruid) +* [Druid gRPC-json server extension](https://github.com/apache/druid/pull/6798) + +Full credit goes to those who have gone this way before. + +Note that the class loader solution used by the two code bases above turned out +to not be needed. See the notes above about the class loader issues. diff --git a/extensions-contrib/grpc-query/pom.xml b/extensions-contrib/grpc-query/pom.xml new file mode 100644 index 00000000000..101e2f34b74 --- /dev/null +++ b/extensions-contrib/grpc-query/pom.xml @@ -0,0 +1,375 @@ + + + + + + 4.0.0 + org.apache.druid.extensions.contrib + grpc-query + grpc-query + grpc-query + + + org.apache.druid + druid + 32.0.0-SNAPSHOT + ../../pom.xml + + + + + + io.grpc + grpc-bom + 1.59.0 + pom + import + + + + + + + org.apache.druid + druid-server + ${project.parent.version} + provided + + + org.apache.druid + druid-processing + ${project.parent.version} + provided + + + org.apache.druid + druid-sql + ${project.parent.version} + provided + + + com.fasterxml.jackson.module + jackson-module-guice + provided + + + com.google.inject + guice + provided + + + com.fasterxml.jackson.core + jackson-databind + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + com.google.inject.extensions + guice-multibindings + provided + + + com.google.guava + guava + ${guava.version} + provided + + + com.google.code.findbugs + jsr305 + provided + + + + io.netty + netty-buffer + provided + + + io.netty + netty-codec-http + provided + + + io.netty + netty-common + provided + + + io.netty + netty-handler + provided + + + io.netty + netty-resolver + provided + + + io.netty + netty-transport + provided + + + io.netty + netty-codec-http2 + + + javax.ws.rs + jsr311-api + provided + + + io.grpc + grpc-api + + + io.grpc + grpc-protobuf + + + io.grpc + grpc-stub + + + io.grpc + grpc-netty + + + io.grpc + grpc-core + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + jakarta.validation + jakarta.validation-api + provided + + + org.apache.calcite.avatica + avatica-core + provided + + + jakarta.inject + jakarta.inject-api + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + joda-time + joda-time + provided + + + org.apache.calcite + calcite-core + provided + + + javax.inject + javax.inject + 1 + provided + + + + + org.apache.druid + druid-sql + ${project.parent.version} + test-jar + test + + + junit + junit + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.apache.druid + druid-server + ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-processing + ${project.parent.version} + test-jar + test + + + org.easymock + easymock + test + + + org.apache.druid.extensions + druid-basic-security + ${project.parent.version} + test + + + org.reflections + reflections + test + + + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.52.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + test-compile + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-test-source + generate-sources + + add-source + + + + target/generated-test-sources/protobuf/java + target/generated-sources/protobuf/grpc-java + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + package + + test-jar + + + tests + + + + + proto-jar + package + + + test-jar + + + test-proto + + **/proto/* + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + io.netty:netty-codec-http2 + io.grpc:grpc-core:jar + io.grpc:grpc-netty:jar + + + + + + diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/client/GrpcResponseHandler.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/client/GrpcResponseHandler.java new file mode 100644 index 00000000000..37ac2cf2e85 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/client/GrpcResponseHandler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.client; + +import com.google.protobuf.AbstractMessageLite; +import com.google.protobuf.ByteString; +import com.google.protobuf.MessageLite; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +public class GrpcResponseHandler +{ + private final T message; + + private GrpcResponseHandler(final Class clazz) + { + this.message = get(clazz); + } + + public static GrpcResponseHandler of(Class clazz) + { + return new GrpcResponseHandler<>(clazz); + } + + public List get(ByteString byteString) + { + return get(new ByteArrayInputStream(byteString.toByteArray())); + } + + @SuppressWarnings("unchecked") + public List get(InputStream inputStream) + { + try { + final List data = new ArrayList<>(); + while (true) { + try { + final MessageLite messageLite = + message + .getDefaultInstanceForType() + .getParserForType() + .parseDelimitedFrom(inputStream); + if (messageLite == null) { + break; + } + data.add((T) messageLite); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + return data; + } + finally { + try { + inputStream.close(); + } + catch (IOException e) { + // ignore + } + } + } + + @SuppressWarnings("unchecked") + private T get(Class clazz) + { + try { + final Method method = clazz.getMethod("getDefaultInstance", new Class[0]); + return (T) method.invoke(null); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/guice/GrpcQueryModule.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/guice/GrpcQueryModule.java new file mode 100644 index 00000000000..6621a92ed95 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/guice/GrpcQueryModule.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.guice; + +import com.google.inject.Binder; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.grpc.server.GrpcEndpointInitializer; +import org.apache.druid.grpc.server.GrpcQueryConfig; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.annotations.LoadScope; +import org.apache.druid.initialization.DruidModule; + +@LoadScope(roles = NodeRole.BROKER_JSON_NAME) +public class GrpcQueryModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, GrpcQueryConfig.CONFIG_BASE, GrpcQueryConfig.class); + LifecycleModule.register(binder, GrpcEndpointInitializer.class); + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/AnonymousAuthServerInterceptor.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/AnonymousAuthServerInterceptor.java new file mode 100644 index 00000000000..3059c603d47 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/AnonymousAuthServerInterceptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.google.common.collect.ImmutableMap; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import org.apache.druid.server.security.Authenticator; + +import javax.inject.Inject; + +/** + * "Authorizes" an anonymous request, which just means adding an "allow all" + * authorization result in the context. Use this form for either of Druid's + * "allow all" authorizers. + * + * @see {@link BasicAuthServerInterceptor} for details + */ +public class AnonymousAuthServerInterceptor implements ServerInterceptor +{ + private final Authenticator authenticator; + + @Inject + public AnonymousAuthServerInterceptor(Authenticator authenticator) + { + this.authenticator = authenticator; + } + + @Override + public Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) + { + return Contexts.interceptCall( + Context.current().withValue( + QueryServer.AUTH_KEY, + authenticator.authenticateJDBCContext(ImmutableMap.of()) + ), + call, + headers, + next + ); + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/BasicAuthServerInterceptor.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/BasicAuthServerInterceptor.java new file mode 100644 index 00000000000..15a4926e209 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/BasicAuthServerInterceptor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.google.common.collect.ImmutableMap; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.Authenticator; + +import javax.inject.Inject; + +/** + * Authorizes a Basic Auth user name and password and sets the resulting + * {@link AuthenticationResult} on the call context. + *

+ * Implements the gRPC {@link ServerInterceptor} to wrap the actual RPC + * call with a step which pulls the "Authorization" header from the request, + * decodes the user name and password, looks up the user using the + * BasicHTTPAuthenticator#authenticateJDBCContext(java.util.Map) + * method, and attaches the resulting {@link AuthenticationResult} to the call + * {@link Context}. The gRPC service will later retrieve the auth result to pass + * into the Driver for use in validating query resources. + *

+ * Note that gRPC documentation in this area is sparse. Examples are hard to + * find. gRPC provides exactly one (obscure) way to do things, as represented + * here. + *

+ * Auth failures can occur in many ways: missing or badly formed header, invalid + * user name or password, etc. In each case, the code throws a + * {@link StatusRuntimeException} with {@link Status#PERMISSION_DENIED}. No hint + * of the problem is provided to the user. + *

+ * This pattern can be replicated for other supported Druid authorizers. + */ +public class BasicAuthServerInterceptor implements ServerInterceptor +{ + public static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String BASIC_PREFIX = "Basic "; + private static final Metadata.Key AUTHORIZATION_KEY = + Metadata.Key.of(AUTHORIZATION_HEADER, Metadata.ASCII_STRING_MARSHALLER); + private static final Logger LOG = new Logger(BasicAuthServerInterceptor.class); + + // Want BasicHTTPAuthenticator, but it is not visible here. + private final Authenticator authenticator; + + @Inject + public BasicAuthServerInterceptor(Authenticator authenticator) + { + this.authenticator = authenticator; + } + + @Override + public Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) + { + // Use a gRPC method to wrap the actual call in a new context + // that includes the auth result. + return Contexts.interceptCall( + Context.current().withValue( + QueryServer.AUTH_KEY, + authenticate(headers.get(AUTHORIZATION_KEY)) + ), + call, + headers, + next + ); + } + + // See BasicHTTPAuthenticator.Filter + public AuthenticationResult authenticate(String encodedUserSecret) + { + if (encodedUserSecret == null) { + throw new StatusRuntimeException(Status.PERMISSION_DENIED); + } + + if (!encodedUserSecret.startsWith(BASIC_PREFIX)) { + throw new StatusRuntimeException(Status.PERMISSION_DENIED); + } + encodedUserSecret = encodedUserSecret.substring(BASIC_PREFIX.length()); + + // At this point, encodedUserSecret is not null, indicating that the request intends to perform + // Basic HTTP authentication. + // Copy of BasicAuthUtils.decodeUserSecret() which is not visible here. + String decodedUserSecret; + try { + decodedUserSecret = StringUtils.fromUtf8(StringUtils.decodeBase64String(encodedUserSecret)); + } + catch (IllegalArgumentException iae) { + LOG.info("Malformed user secret."); + throw new StatusRuntimeException(Status.PERMISSION_DENIED); + } + + String[] splits = decodedUserSecret.split(":"); + if (splits.length != 2) { + // The decoded user secret is not of the right format + throw new StatusRuntimeException(Status.PERMISSION_DENIED); + } + + final String user = splits[0]; + final String password = splits[1]; + + // Fail fast for any authentication error. If the authentication result is null we also fail + // as this indicates a non-existent user. + try { + AuthenticationResult authenticationResult = authenticator.authenticateJDBCContext( + ImmutableMap.of("user", user, "password", password) + ); + if (authenticationResult == null) { + throw new StatusRuntimeException(Status.PERMISSION_DENIED); + } + return authenticationResult; + } + // Want BasicSecurityAuthenticationException, but it is not visible here. + catch (IllegalArgumentException ex) { + LOG.info("Exception authenticating user [%s] - [%s]", user, ex.getMessage()); + throw new StatusRuntimeException(Status.PERMISSION_DENIED); + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/GrpcEndpointInitializer.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/GrpcEndpointInitializer.java new file mode 100644 index 00000000000..1cb3884884c --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/GrpcEndpointInitializer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.guice.ManageLifecycleServer; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.NativeQuery; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.sql.SqlStatementFactory; + +import javax.inject.Inject; + +import java.io.IOException; + +/** + * Initializes the gRPC endpoint (server). This version uses a Netty-based server + * separate from Druid's primary Jetty-based server. We may want to consider a + * + * recent addition to the gRPC examples to run gRPC as a servlet. However, trying + * that turned out to incur many issues, including the fact that there was no way + * to pass the AuthenticationResult down through the many layers of gRPC into the + * query code. So, we use the gRPC server instead. + *

+ * An instance of this class is created by Guice and managed via Druid's + * lifecycle manager. + */ +@ManageLifecycleServer +public class GrpcEndpointInitializer +{ + private static final Logger log = new Logger(GrpcEndpointInitializer.class); + + private final GrpcQueryConfig config; + private final QueryDriver driver; + private final AuthenticatorMapper authMapper; + + private QueryServer server; + + @Inject + public GrpcEndpointInitializer( + GrpcQueryConfig config, + final @Json ObjectMapper jsonMapper, + final @NativeQuery SqlStatementFactory sqlStatementFactory, + final QueryLifecycleFactory queryLifecycleFactory, + final AuthenticatorMapper authMapper + ) + { + this.config = config; + this.authMapper = authMapper; + this.driver = new QueryDriver(jsonMapper, sqlStatementFactory, queryLifecycleFactory); + } + + @LifecycleStart + public void start() + { + server = new QueryServer(config, driver, authMapper); + try { + server.start(); + } + catch (IOException e) { + // Indicates an error when gRPC tried to start the server + // (such the port is already in use.) + log.error(e, "Fatal error: gRPC query server startup failed"); + + // This exception will bring down the Broker as there is not much we can + // do if we can't start the gRPC endpoint. + throw new ISE(e, "Fatal error: grpc query server startup failed"); + } + catch (Throwable t) { + // Catch-all for other errors. The most likely error is that some class was not found + // (that is, class loader issues in an IDE, or a jar missing in the extension). + log.error(t, "Fatal error: gRPC query server startup failed"); + + // This exception will bring down the Broker as there is not much we can + // do if we can't start the gRPC endpoint. + throw t; + } + } + + @LifecycleStop + public void stop() + { + if (server != null) { + try { + server.blockUntilShutdown(); + } + catch (InterruptedException e) { + // Just warn. We're shutting down anyway, so no need to throw an exception. + log.warn(e, "gRPC query server shutdown failed"); + } + server = null; + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/GrpcQueryConfig.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/GrpcQueryConfig.java new file mode 100644 index 00000000000..a9cdde23970 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/GrpcQueryConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Max; + +/** + * Grpc configs for the extension. + */ +public class GrpcQueryConfig +{ + public static final String CONFIG_BASE = "druid.grpcQuery"; + + @JsonProperty + @Max(0xffff) + private int port = 50051; + + public GrpcQueryConfig() + { + } + + public GrpcQueryConfig(int port) + { + this.port = port; + } + + /** + * @return the port to accept gRPC client connections on + */ + public int getPort() + { + return port; + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/HealthService.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/HealthService.java new file mode 100644 index 00000000000..d40b0b68bd2 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/HealthService.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Context; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import org.apache.druid.grpc.proto.HealthGrpc; +import org.apache.druid.grpc.proto.HealthOuterClass.HealthCheckRequest; +import org.apache.druid.grpc.proto.HealthOuterClass.HealthCheckResponse; +import org.apache.druid.grpc.proto.HealthOuterClass.HealthCheckResponse.ServingStatus; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; + +/** + * Implementation of grpc health service. Provides {@code check(HealthCheckRequest, StreamObserver(HealthCheckResponse))} + * method to get health of a specific service or the overall server health. + *

+ * A client can call the {@code watch(HealthCheckRequest, StreamObserver(HealthCheckResponse))} method + * to perform a streaming health-check. + * The server will immediately send back a message indicating the current serving status. + * It will then subsequently send a new message whenever the service's serving status changes. + */ +class HealthService extends HealthGrpc.HealthImplBase +{ + private final ConcurrentMap serviceStatusMap; + private final ConcurrentMap cancellationContexts; + private final ConcurrentMap statusChangeLatchMap; + + public HealthService() + { + this.serviceStatusMap = new ConcurrentHashMap<>(); + this.cancellationContexts = new ConcurrentHashMap<>(); + this.statusChangeLatchMap = new ConcurrentHashMap<>(); + } + + @Override + public void check( + HealthCheckRequest request, + StreamObserver responseObserver + ) + { + String serviceName = request.getService(); + ServingStatus status = getServiceStatus(serviceName); + HealthCheckResponse response = buildHealthCheckResponse(status); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void watch( + HealthCheckRequest request, + StreamObserver responseObserver + ) + { + String serviceName = request.getService(); + + Context.CancellableContext existingContext = cancellationContexts.get(serviceName); + if (existingContext != null) { + // Another request is already watching the same service + responseObserver.onError(Status.ALREADY_EXISTS.withDescription( + "Another watch request is already in progress for the same service").asRuntimeException()); + return; + } + + Context.CancellableContext cancellableContext = Context.current().withCancellation(); + cancellationContexts.put(serviceName, cancellableContext); + + // Attach a cancellation listener to the context + cancellableContext.addListener((context) -> { + // If the context is cancelled, remove the observer from the map + cancellationContexts.remove(serviceName); + }, MoreExecutors.directExecutor()); + + + // Send an initial response with the current serving status + ServingStatus servingStatus = getServiceStatus(serviceName); + HealthCheckResponse initialResponse = buildHealthCheckResponse(servingStatus); + responseObserver.onNext(initialResponse); + + // Continuously listen for service status changes + while (!cancellableContext.isCancelled()) { + // Wait for the service status to change + // Update the serving status and send a new response + servingStatus = waitForServiceStatusChange(serviceName); + HealthCheckResponse updatedResponse = buildHealthCheckResponse(servingStatus); + responseObserver.onNext(updatedResponse); + } + + cancellationContexts.remove(serviceName); + responseObserver.onCompleted(); + } + + private HealthCheckResponse buildHealthCheckResponse(ServingStatus status) + { + return HealthCheckResponse + .newBuilder() + .setStatus(status) + .build(); + } + + // Method to register a new service with its initial serving status + public void registerService(String serviceName, ServingStatus servingStatus) + { + setServiceStatus(serviceName, servingStatus); + } + + // Method to unregister a service + public void unregisterService(String serviceName) + { + setServiceStatus(serviceName, ServingStatus.NOT_SERVING); + } + + private void setServiceStatus(String serviceName, ServingStatus newStatus) + { + ServingStatus currentStatus = getServiceStatus(serviceName); + if (currentStatus != newStatus) { + serviceStatusMap.put(serviceName, newStatus); + + // Notify the waiting threads + CountDownLatch statusChangeLatch = statusChangeLatchMap.get(serviceName); + if (statusChangeLatch != null) { + statusChangeLatch.countDown(); + } + } + } + + public ServingStatus getServiceStatus(String serviceName) + { + return serviceStatusMap.getOrDefault(serviceName, ServingStatus.UNKNOWN); + } + + public ServingStatus waitForServiceStatusChange(String serviceName) + { + CountDownLatch statusChangeLatch = new CountDownLatch(1); + statusChangeLatchMap.put(serviceName, statusChangeLatch); + + // Wait for the status change or until the thread is interrupted + try { + statusChangeLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + statusChangeLatchMap.remove(serviceName); + + return getServiceStatus(serviceName); + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/ProtobufTransformer.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/ProtobufTransformer.java new file mode 100644 index 00000000000..795ea2b40af --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/ProtobufTransformer.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.SqlRowTransformer; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.joda.time.DateTime; +import org.joda.time.DateTimeUtils; +import org.joda.time.DateTimeZone; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Optional; +import java.util.TimeZone; + +/** + * Transforms query result for protobuf format + */ +public class ProtobufTransformer +{ + + /** + * Transform a sql query result into protobuf result format. + * For complex or missing column type the object is converted into ByteString. + * date and time column types is converted into proto timestamp. + * Remaining column types are not converted. + * + * @param rowTransformer row signature for sql query result + * @param row result row + * @param i index in the result row + * @return transformed query result in protobuf result format + */ + @Nullable + public static Object transform(SqlRowTransformer rowTransformer, Object[] row, int i) + { + if (row[i] == null) { + return null; + } + final RelDataType rowType = rowTransformer.getRowType(); + final SqlTypeName sqlTypeName = rowType.getFieldList().get(i).getType().getSqlTypeName(); + final RowSignature signature = RowSignatures.fromRelDataType(rowType.getFieldNames(), rowType); + final Optional columnType = signature.getColumnType(i); + + if (sqlTypeName == SqlTypeName.TIMESTAMP + || sqlTypeName == SqlTypeName.DATE) { + if (sqlTypeName == SqlTypeName.TIMESTAMP) { + return convertEpochToProtoTimestamp((long) row[i]); + } + return convertDateToProtoTimestamp((int) row[i]); + } + + if (!columnType.isPresent()) { + return convertComplexType(row[i]); + } + + final ColumnType druidType = columnType.get(); + + if (druidType == ColumnType.STRING) { + return row[i]; + } else if (druidType == ColumnType.LONG) { + return row[i]; + } else if (druidType == ColumnType.FLOAT) { + return row[i]; + } else if (druidType == ColumnType.DOUBLE) { + return row[i]; + } else { + return convertComplexType(row[i]); + } + } + + /** + * Transform a native query result into protobuf result format. + * For complex or missing column type the object is converted into ByteString. + * date and time column types are converted into proto timestamp. + * Remaining column types are not converted. + * + * @param rowSignature type signature for a query result row + * @param row result row + * @param i index in the result + * @param convertToTimestamp if the result should be converted to proto timestamp + * @return transformed query result in protobuf result format + */ + @Nullable + public static Object transform(RowSignature rowSignature, Object[] row, int i, boolean convertToTimestamp) + { + if (row[i] == null) { + return null; + } + + final Optional columnType = rowSignature.getColumnType(i); + + if (convertToTimestamp) { + return convertEpochToProtoTimestamp((long) row[i]); + } + + if (!columnType.isPresent()) { + return convertComplexType(row[i]); + } + + final ColumnType druidType = columnType.get(); + + if (druidType == ColumnType.STRING) { + return row[i]; + } else if (druidType == ColumnType.LONG) { + return row[i]; + } else if (druidType == ColumnType.FLOAT) { + return row[i]; + } else if (druidType == ColumnType.DOUBLE) { + return row[i]; + } else { + return convertComplexType(row[i]); + } + } + + public static Timestamp convertEpochToProtoTimestamp(long value) + { + DateTime dateTime = Calcites.calciteTimestampToJoda(value, DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))); + long seconds = DateTimeUtils.getInstantMillis(dateTime) / 1000; + return Timestamp.newBuilder().setSeconds(seconds).build(); + } + + public static Timestamp convertDateToProtoTimestamp(int value) + { + DateTime dateTime = Calcites.calciteDateToJoda(value, DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))); + long seconds = DateTimeUtils.getInstantMillis(dateTime) / 1000; + return Timestamp.newBuilder().setSeconds(seconds).build(); + } + + private static ByteString convertComplexType(Object value) + { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(value); + oos.flush(); + return ByteString.copyFrom(bos.toByteArray()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/ProtobufWriter.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/ProtobufWriter.java new file mode 100644 index 00000000000..bf88c33d08a --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/ProtobufWriter.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.Message; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.ResultFormat; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of {@code ResultFormat.Writer} for protobuf message. + */ +public class ProtobufWriter implements ResultFormat.Writer +{ + private final OutputStream outputStream; + private final GeneratedMessageV3 message; + private Message.Builder rowBuilder; + private final Map methods = new HashMap<>(); + + + public ProtobufWriter(OutputStream outputStream, Class clazz) + { + this.outputStream = outputStream; + this.message = get(clazz); + } + + private GeneratedMessageV3 get(Class clazz) + { + try { + final Method method = clazz.getMethod("getDefaultInstance", new Class[0]); + return clazz.cast(method.invoke(null)); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void writeResponseStart() + { + } + + @Override + public void writeHeader(RelDataType rowType, boolean includeTypes, boolean includeSqlTypes) + { + } + + @Override + public void writeHeaderFromRowSignature(RowSignature rowSignature, boolean b) + { + + } + + @Override + public void writeRowStart() + { + rowBuilder = message.getDefaultInstanceForType().newBuilderForType(); + } + + @Override + public void writeRowField(String name, @Nullable Object value) + { + if (value == null) { + return; + } + final Descriptors.FieldDescriptor fieldDescriptor = + message.getDescriptorForType().findFieldByName(name); + // we should throw an exception if fieldDescriptor is null + // this means the .proto fields don't match returned column names + if (fieldDescriptor == null) { + throw new QueryDriver.RequestError( + "Field [%s] not found in Protobuf [%s]", + name, + message.getClass() + ); + } + final Method method = methods.computeIfAbsent("setField", k -> { + try { + return rowBuilder + .getClass() + .getMethod( + "setField", new Class[]{Descriptors.FieldDescriptor.class, Object.class}); + } + catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + }); + try { + method.invoke(rowBuilder, fieldDescriptor, value); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new QueryDriver.RequestError( + "Could not write value [%s] to field [%s]", + value, + name + ); + } + } + + @Override + public void writeRowEnd() throws IOException + { + Message rowMessage = rowBuilder.build(); + rowMessage.writeDelimitedTo(outputStream); + } + + @Override + public void writeResponseEnd() + { + } + + @Override + public void close() throws IOException + { + outputStream.flush(); + outputStream.close(); + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java new file mode 100644 index 00000000000..096a1439a4f --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessageV3; +import org.apache.calcite.avatica.SqlType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.druid.grpc.proto.QueryOuterClass; +import org.apache.druid.grpc.proto.QueryOuterClass.ColumnSchema; +import org.apache.druid.grpc.proto.QueryOuterClass.DruidType; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryParameter; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryRequest; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryStatus; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Accumulator; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.QueryLifecycle; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.sql.DirectStatement; +import org.apache.druid.sql.DirectStatement.ResultSet; +import org.apache.druid.sql.SqlPlanningException; +import org.apache.druid.sql.SqlQueryPlus; +import org.apache.druid.sql.SqlRowTransformer; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.sql.http.SqlParameter; +import org.joda.time.format.ISODateTimeFormat; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + + +/** + * "Driver" for the gRPC query endpoint. Handles translating the gRPC {@link QueryRequest} + * into Druid's internal formats, running the query, and translating the results into a + * gRPC {@link QueryResponse}. Allows for easier unit testing as we separate the machinery + * of running a query, given the request, from the gRPC server machinery. + */ +public class QueryDriver +{ + private static final Logger log = new Logger(QueryDriver.class); + + private static final String TIME_FIELD_KEY = "timeFieldKey"; + + /** + * Internal runtime exception to report request errors. + */ + protected static class RequestError extends RE + { + public RequestError(String msg, Object... args) + { + super(msg, args); + } + } + + private final ObjectMapper jsonMapper; + private final SqlStatementFactory sqlStatementFactory; + private final QueryLifecycleFactory queryLifecycleFactory; + + public QueryDriver( + final ObjectMapper jsonMapper, + final SqlStatementFactory sqlStatementFactory, + final QueryLifecycleFactory queryLifecycleFactory + ) + { + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); + this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory, "sqlStatementFactory"); + this.queryLifecycleFactory = queryLifecycleFactory; + } + + /** + * First-cut synchronous query handler. Druid prefers to stream results, in + * part to avoid overly-short network timeouts. However, for now, we simply run + * the query within this call and prepare the Protobuf response. Async handling + * can come later. + */ + public QueryResponse submitQuery(QueryRequest request, AuthenticationResult authResult) + { + if (request.getQueryType() == QueryOuterClass.QueryType.NATIVE) { + return runNativeQuery(request, authResult); + } else { + return runSqlQuery(request, authResult); + } + } + + private QueryResponse runNativeQuery(QueryRequest request, AuthenticationResult authResult) + { + Query query; + try { + query = jsonMapper.readValue(request.getQuery(), Query.class); + } + catch (JsonProcessingException e) { + return QueryResponse.newBuilder() + .setQueryId("") + .setStatus(QueryStatus.REQUEST_ERROR) + .setErrorMessage(e.getMessage()) + .build(); + } + if (Strings.isNullOrEmpty(query.getId())) { + query = query.withId(UUID.randomUUID().toString()); + } + + final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize(); + + final org.apache.druid.server.QueryResponse queryResponse; + final String currThreadName = Thread.currentThread().getName(); + try { + queryLifecycle.initialize(query); + Access authorizationResult = queryLifecycle.authorize(authResult); + if (!authorizationResult.isAllowed()) { + throw new ForbiddenException(Access.DEFAULT_ERROR_MESSAGE); + } + queryResponse = queryLifecycle.execute(); + + QueryToolChest queryToolChest = queryLifecycle.getToolChest(); + + Sequence sequence = queryToolChest.resultsAsArrays(query, queryResponse.getResults()); + RowSignature rowSignature = queryToolChest.resultArraySignature(query); + + Thread.currentThread().setName(StringUtils.format("grpc-native[%s]", query.getId())); + final ByteString results = encodeNativeResults(request, sequence, rowSignature); + return QueryResponse.newBuilder() + .setQueryId(query.getId()) + .setStatus(QueryStatus.OK) + .setData(results) + .clearErrorMessage() + .addAllColumns(encodeNativeColumns(rowSignature, request.getSkipColumnsList())) + .build(); + } + catch (IOException | RuntimeException e) { + return QueryResponse.newBuilder() + .setQueryId(query.getId()) + .setStatus(QueryStatus.RUNTIME_ERROR) + .setErrorMessage(e.getMessage()) + .build(); + } + finally { + Thread.currentThread().setName(currThreadName); + } + } + + private QueryResponse runSqlQuery(QueryRequest request, AuthenticationResult authResult) + { + final SqlQueryPlus queryPlus; + try { + queryPlus = translateQuery(request, authResult); + } + catch (RuntimeException e) { + return QueryResponse.newBuilder() + .setQueryId("") + .setStatus(QueryStatus.REQUEST_ERROR) + .setErrorMessage(e.getMessage()) + .build(); + } + final DirectStatement stmt = sqlStatementFactory.directStatement(queryPlus); + final String currThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(StringUtils.format("grpc-sql[%s]", stmt.sqlQueryId())); + final ResultSet thePlan = stmt.plan(); + final SqlRowTransformer rowTransformer = thePlan.createRowTransformer(); + final ByteString results = encodeSqlResults(request, thePlan.run().getResults(), rowTransformer); + stmt.reporter().succeeded(results.size()); + stmt.close(); + return QueryResponse.newBuilder() + .setQueryId(stmt.sqlQueryId()) + .setStatus(QueryStatus.OK) + .setData(results) + .clearErrorMessage() + .addAllColumns(encodeSqlColumns(rowTransformer)) + .build(); + } + catch (ForbiddenException e) { + stmt.reporter().failed(e); + stmt.close(); + throw e; + } + catch (RequestError e) { + stmt.reporter().failed(e); + stmt.close(); + return QueryResponse.newBuilder() + .setQueryId(stmt.sqlQueryId()) + .setStatus(QueryStatus.REQUEST_ERROR) + .setErrorMessage(e.getMessage()) + .build(); + } + catch (SqlPlanningException e) { + stmt.reporter().failed(e); + stmt.close(); + return QueryResponse.newBuilder() + .setQueryId(stmt.sqlQueryId()) + .setStatus(QueryStatus.INVALID_SQL) + .setErrorMessage(e.getMessage()) + .build(); + } + catch (IOException | RuntimeException e) { + stmt.reporter().failed(e); + stmt.close(); + return QueryResponse.newBuilder() + .setQueryId(stmt.sqlQueryId()) + .setStatus(QueryStatus.RUNTIME_ERROR) + .setErrorMessage(e.getMessage()) + .build(); + } + // There is a claim that Calcite sometimes throws a java.lang.AssertionError, but we do not have a test that can + // reproduce it checked into the code (the best we have is something that uses mocks to throw an Error, which is + // dubious at best). We keep this just in case, but it might be best to remove it and see where the + // AssertionErrors are coming from and do something to ensure that they don't actually make it out of Calcite + catch (AssertionError e) { + stmt.reporter().failed(e); + stmt.close(); + return QueryResponse.newBuilder() + .setQueryId(stmt.sqlQueryId()) + .setStatus(QueryStatus.RUNTIME_ERROR) + .setErrorMessage(e.getMessage()) + .build(); + } + finally { + Thread.currentThread().setName(currThreadName); + } + } + + /** + * Convert the rRPC query format to the internal {@link SqlQueryPlus} format. + */ + private SqlQueryPlus translateQuery(QueryRequest request, AuthenticationResult authResult) + { + return SqlQueryPlus.builder() + .sql(request.getQuery()) + .context(translateContext(request)) + .sqlParameters(translateParameters(request)) + .auth(authResult) + .build(); + } + + /** + * Translate the query context from the gRPC format to the internal format. When + * read from REST/JSON, the JSON translator will convert the type of each value + * into a number, Boolean, etc. gRPC has no similar feature. Rather than clutter up + * the gRPC request with typed context values, we rely on the existing code that can + * translate string values to the desired type on the fly. Thus, we build up a + * {@code Map}. + */ + private Map translateContext(QueryRequest request) + { + ImmutableMap.Builder builder = ImmutableMap.builder(); + if (request.getContextCount() > 0) { + for (Map.Entry entry : request.getContextMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue()); + } + } + return builder.build(); + } + + /** + * Convert the gRPC parameter format to the internal Druid {@link SqlParameter} + * format. That format is then again translated by the {@link SqlQueryPlus} class. + */ + private List translateParameters(QueryRequest request) + { + if (request.getParametersCount() == 0) { + return null; + } + List params = new ArrayList<>(); + for (QueryParameter value : request.getParametersList()) { + params.add(translateParameter(value)); + } + return params; + } + + private SqlParameter translateParameter(QueryParameter value) + { + switch (value.getValueCase()) { + case DOUBLEVALUE: + return new SqlParameter(SqlType.DOUBLE, value.getDoubleValue()); + case LONGVALUE: + return new SqlParameter(SqlType.BIGINT, value.getLongValue()); + case STRINGVALUE: + return new SqlParameter(SqlType.VARCHAR, value.getStringValue()); + case NULLVALUE: + case VALUE_NOT_SET: + return null; + case ARRAYVALUE: + default: + throw new RequestError("Invalid parameter type: " + value.getValueCase().name()); + } + } + + /** + * Translate the column schema from the Druid internal form to the gRPC + * {@link ColumnSchema} form. Note that since the gRPC response returns the + * schema, none of the data formats include a header. This makes the data format + * simpler and cleaner. + */ + private Iterable encodeSqlColumns(SqlRowTransformer rowTransformer) + { + RelDataType rowType = rowTransformer.getRowType(); + final RowSignature signature = RowSignatures.fromRelDataType(rowType.getFieldNames(), rowType); + List cols = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + ColumnSchema col = ColumnSchema.newBuilder() + .setName(signature.getColumnName(i)) + .setSqlType(rowType.getFieldList().get(i).getType().getSqlTypeName().getName()) + .setDruidType(convertDruidType(signature.getColumnType(i))) + .build(); + cols.add(col); + } + return cols; + } + + private Iterable encodeNativeColumns(RowSignature rowSignature, List skipColumns) + { + List cols = new ArrayList<>(); + for (int i = 0; i < rowSignature.getColumnNames().size(); i++) { + if (skipColumns.contains(rowSignature.getColumnName(i))) { + continue; + } + ColumnSchema col = ColumnSchema.newBuilder() + .setName(rowSignature.getColumnName(i)) + .setDruidType(convertDruidType(rowSignature.getColumnType(i))) + .build(); + cols.add(col); + } + return cols; + } + + /** + * Convert from Druid's internal format of the Druid data type to the gRPC form. + */ + private DruidType convertDruidType(Optional colType) + { + if (!colType.isPresent()) { + return DruidType.UNKNOWN_TYPE; + } + ColumnType druidType = colType.get(); + if (druidType == ColumnType.STRING) { + return DruidType.STRING; + } + if (druidType == ColumnType.STRING_ARRAY) { + return DruidType.STRING_ARRAY; + } + if (druidType == ColumnType.LONG) { + return DruidType.LONG; + } + if (druidType == ColumnType.LONG_ARRAY) { + return DruidType.LONG_ARRAY; + } + if (druidType == ColumnType.FLOAT) { + return DruidType.FLOAT; + } + if (druidType == ColumnType.FLOAT_ARRAY) { + return DruidType.FLOAT_ARRAY; + } + if (druidType == ColumnType.DOUBLE) { + return DruidType.DOUBLE; + } + if (druidType == ColumnType.DOUBLE_ARRAY) { + return DruidType.DOUBLE_ARRAY; + } + if (druidType == ColumnType.UNKNOWN_COMPLEX) { + return DruidType.COMPLEX; + } + return DruidType.UNKNOWN_TYPE; + } + + /** + * Generic mechanism to write query results to one of the supported gRPC formats. + */ + public interface GrpcResultWriter + { + void start() throws IOException; + + void writeRow(Object[] row) throws IOException; + + void close() throws IOException; + } + + /** + * Writer for the SQL result formats. Reuses the SQL format writer implementations. + * Note: gRPC does not use the headers: schema information is available in the + * rRPC response. + */ + public static class GrpcSqlResultFormatWriter implements GrpcResultWriter + { + protected final ResultFormat.Writer formatWriter; + protected final SqlRowTransformer rowTransformer; + + public GrpcSqlResultFormatWriter( + final ResultFormat.Writer formatWriter, + final SqlRowTransformer rowTransformer + ) + { + this.formatWriter = formatWriter; + this.rowTransformer = rowTransformer; + } + + @Override + public void start() throws IOException + { + formatWriter.writeResponseStart(); + } + + @Override + public void writeRow(Object[] row) throws IOException + { + formatWriter.writeRowStart(); + for (int i = 0; i < rowTransformer.getFieldList().size(); i++) { + final Object value; + if (formatWriter instanceof ProtobufWriter) { + value = ProtobufTransformer.transform(rowTransformer, row, i); + } else { + value = rowTransformer.transform(row, i); + } + formatWriter.writeRowField(rowTransformer.getFieldList().get(i), value); + } + formatWriter.writeRowEnd(); + } + + @Override + public void close() throws IOException + { + formatWriter.writeResponseEnd(); + formatWriter.close(); + } + } + + public static class GrpcNativeResultFormatWriter implements GrpcResultWriter + { + protected final ResultFormat.Writer formatWriter; + protected final RowSignature rowSignature; + private final String timeFieldName; + private final List timeColumns; + private final List skipColumns; + + public GrpcNativeResultFormatWriter( + final ResultFormat.Writer formatWriter, + final RowSignature rowSignature, + final String timeFieldName, + final List timeColumns, + final List skipColumns + ) + { + this.formatWriter = formatWriter; + this.rowSignature = rowSignature; + this.timeFieldName = timeFieldName; + this.timeColumns = timeColumns; + this.skipColumns = skipColumns; + } + + @Override + public void start() throws IOException + { + formatWriter.writeResponseStart(); + } + + @Override + public void writeRow(Object[] row) throws IOException + { + formatWriter.writeRowStart(); + + for (int i = 0; i < rowSignature.getColumnNames().size(); i++) { + + final String columnName = rowSignature.getColumnName(i); + if (skipColumns.contains(columnName)) { + log.debug("Skipping column [%s] from the result.", columnName); + continue; + } + + boolean isDruidTimeColumn = columnName.equals(ColumnHolder.TIME_COLUMN_NAME); + boolean convertTime = timeColumns.contains(rowSignature.getColumnName(i)); + + final Object value; + if (formatWriter instanceof ProtobufWriter) { + value = ProtobufTransformer.transform(rowSignature, row, i, convertTime); + } else { + if (convertTime) { + value = ISODateTimeFormat.dateTime().print(((long) row[i])); + } else { + value = row[i]; + } + } + final String outputColumnName; + if (isDruidTimeColumn) { + outputColumnName = timeFieldName; + } else { + outputColumnName = rowSignature.getColumnName(i); + } + formatWriter.writeRowField(outputColumnName, value); + } + formatWriter.writeRowEnd(); + } + + + @Override + public void close() throws IOException + { + formatWriter.writeResponseEnd(); + formatWriter.close(); + } + } + + /** + * Internal runtime exception to pass {@link IOException}s though the + * {@link Sequence} {@link Accumulator} protocol. + */ + private static class ResponseError extends RuntimeException + { + public ResponseError(IOException e) + { + super(e); + } + } + + /** + * Druid query results use a complex {@link Sequence} mechanism. This class uses an + * {@link Accumulator} to walk the results and present each to the associated + * results writer. This is a very rough analogy of the {@code SqlResourceQueryResultPusher} + * in the REST {@code SqlResource} class. + */ + public static class GrpcResultsAccumulator implements Accumulator + { + private final GrpcResultWriter writer; + + public GrpcResultsAccumulator(final GrpcResultWriter writer) + { + this.writer = writer; + } + + public void push(Sequence results) throws IOException + { + writer.start(); + try { + results.accumulate(null, this); + } + catch (ResponseError e) { + throw (IOException) e.getCause(); + } + writer.close(); + } + + @Override + public Void accumulate(Void accumulated, Object[] in) + { + try { + writer.writeRow(in); + } + catch (IOException e) { + throw new ResponseError(e); + } + return null; + } + } + + /** + * Convert the query results to a set of bytes to be attached to the query response. + *

+ * This version is pretty basic: the results are materialized as a byte array. That's + * fine for small result sets, but should be rethought for larger result sets. + */ + private ByteString encodeSqlResults( + final QueryRequest request, + final Sequence result, + final SqlRowTransformer rowTransformer + ) throws IOException + { + // Accumulate the results as a byte array. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GrpcResultWriter writer; + + // For the SQL-supported formats, use the SQL-provided writers. + switch (request.getResultFormat()) { + case CSV: + writer = new GrpcSqlResultFormatWriter( + ResultFormat.CSV.createFormatter(out, jsonMapper), + rowTransformer + ); + break; + case JSON_ARRAY: + writer = new GrpcSqlResultFormatWriter( + ResultFormat.ARRAY.createFormatter(out, jsonMapper), + rowTransformer + ); + break; + case JSON_ARRAY_LINES: + writer = new GrpcSqlResultFormatWriter( + ResultFormat.ARRAYLINES.createFormatter(out, jsonMapper), + rowTransformer + ); + break; + case PROTOBUF_INLINE: + writer = new GrpcSqlResultFormatWriter( + new ProtobufWriter(out, getProtobufClass(request)), + rowTransformer + ); + break; + default: + throw new RequestError("Unsupported query result format: " + request.getResultFormat().name()); + } + GrpcResultsAccumulator accumulator = new GrpcResultsAccumulator(writer); + accumulator.push(result); + return ByteString.copyFrom(out.toByteArray()); + } + + private ByteString encodeNativeResults( + final QueryRequest request, + final Sequence result, + final RowSignature rowSignature + ) throws IOException + { + // Accumulate the results as a byte array. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GrpcResultWriter writer; + final String timeFieldName = request.getContextMap().getOrDefault(TIME_FIELD_KEY, "time"); + final List skipColumns = request.getSkipColumnsList(); + final List timeColumns = request.getTimeColumnsList(); + + switch (request.getResultFormat()) { + case CSV: + writer = new GrpcNativeResultFormatWriter( + ResultFormat.CSV.createFormatter(out, jsonMapper), + rowSignature, + timeFieldName, + timeColumns, + skipColumns + ); + break; + case JSON_ARRAY: + writer = new GrpcNativeResultFormatWriter( + ResultFormat.ARRAY.createFormatter(out, jsonMapper), + rowSignature, + timeFieldName, + timeColumns, + skipColumns + ); + break; + case JSON_ARRAY_LINES: + writer = new GrpcNativeResultFormatWriter( + ResultFormat.ARRAYLINES.createFormatter(out, jsonMapper), + rowSignature, + timeFieldName, + timeColumns, + skipColumns + ); + break; + case PROTOBUF_INLINE: + writer = new GrpcNativeResultFormatWriter( + new ProtobufWriter(out, getProtobufClass(request)), + rowSignature, + timeFieldName, + timeColumns, + skipColumns + ); + break; + default: + throw new RequestError("Unsupported query result format: " + request.getResultFormat()); + } + GrpcResultsAccumulator accumulator = new GrpcResultsAccumulator(writer); + accumulator.push(result); + return ByteString.copyFrom(out.toByteArray()); + } + + @SuppressWarnings("unchecked") + private Class getProtobufClass(final QueryRequest request) + { + try { + return (Class) Class.forName(request.getProtobufMessageName()); + } + catch (ClassNotFoundException e) { + throw new RequestError( + "The Protobuf class [%s] is not known. Is your protobuf jar on the class path?", + request.getProtobufMessageName() + ); + } + catch (ClassCastException e) { + throw new RequestError( + "The class [%s] is not a Protobuf", + request.getProtobufMessageName() + ); + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryServer.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryServer.java new file mode 100644 index 00000000000..569ed69ef3e --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryServer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import io.grpc.Context; +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import org.apache.druid.grpc.proto.HealthOuterClass.HealthCheckResponse.ServingStatus; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.security.AllowAllAuthenticator; +import org.apache.druid.server.security.AnonymousAuthenticator; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.Authenticator; +import org.apache.druid.server.security.AuthenticatorMapper; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + + +/** + * Basic gRPC server adapted from the gRPC examples. Delegates to the + * {@link QueryDriver} class to do the actual work of running the query. + *

+ * This class is preliminary. It is good enough for unit tests, but a bit more work + * is needed to integrate this class into the Druid server. + *

+ * Also, how will authorization be handled in the gRPC path? + */ +public class QueryServer +{ + public static final Context.Key AUTH_KEY = Context.key("druid-auth"); + private static final Logger log = new Logger(QueryServer.class); + + private final AuthenticatorMapper authMapper; + private final int port; + private final QueryDriver driver; + private Server server; + + private final HealthService healthService; + + public QueryServer( + GrpcQueryConfig config, + QueryDriver driver, + AuthenticatorMapper authMapper + ) + { + this.port = config.getPort(); + this.driver = driver; + this.authMapper = authMapper; + this.healthService = new HealthService(); + } + + public void start() throws IOException + { + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(ServerInterceptors.intercept(new QueryService(driver), makeSecurityInterceptor())) + .addService(healthService) + .build() + .start(); + + healthService.registerService(QueryService.class.getSimpleName(), ServingStatus.SERVING); + healthService.registerService("", ServingStatus.SERVING); + + log.info("Grpc Server started, listening on " + port); + } + + + /** + * Map from a Druid authenticator to a gRPC server interceptor. This is a bit of a hack. + * Authenticators don't know about gRPC: we have to explicitly do the mapping. This means + * that auth extensions occur independently of gRPC and are not supported. Longer term, + * we need a way for the extension itself to do the required mapping. + */ + private ServerInterceptor makeSecurityInterceptor() + { + // First look for a Basic authenticator + for (Authenticator authenticator : authMapper.getAuthenticatorChain()) { + // Want authenticator instanceof BasicHTTPAuthenticator, but + // BasicHTTPAuthenticator is not visible here. + if ("BasicHTTPAuthenticator".equals(authenticator.getClass().getSimpleName())) { + log.info("Using Basic authentication"); + return new BasicAuthServerInterceptor(authenticator); + } + } + + // Otherwise, look for an Anonymous authenticator + for (Authenticator authenticator : authMapper.getAuthenticatorChain()) { + if (authenticator instanceof AnonymousAuthenticator || authenticator instanceof AllowAllAuthenticator) { + log.info("Using Anonymous authentication"); + return new AnonymousAuthServerInterceptor(authenticator); + } + } + + // gRPC does not support other forms of authenticators yet. + String msg = "The gRPC query server requires either a Basic or Anonymous authorizer: it does not work with others yet."; + log.error(msg); + throw new UOE(msg); + } + + public void stop() throws InterruptedException + { + if (server != null) { + log.info("Server stopping"); + healthService.unregisterService(QueryService.class.getSimpleName()); + healthService.unregisterService(""); + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + public void blockUntilShutdown() throws InterruptedException + { + if (server != null) { + log.info("Grpc Server stopping"); + healthService.unregisterService(QueryService.class.getSimpleName()); + healthService.unregisterService(""); + server.awaitTermination(); + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryService.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryService.java new file mode 100644 index 00000000000..ddd99c456e3 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryService.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.server; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.apache.druid.grpc.proto.QueryGrpc; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryRequest; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.server.security.ForbiddenException; + +/** + * Implementation of the gRPC Query service. Provides a single method + * to run a query using the "driver" that holds the actual Druid SQL + * logic. + */ +class QueryService extends QueryGrpc.QueryImplBase +{ + private final QueryDriver driver; + + public QueryService(QueryDriver driver) + { + this.driver = driver; + } + + @Override + public void submitQuery(QueryRequest request, StreamObserver responseObserver) + { + try { + QueryResponse reply = driver.submitQuery(request, QueryServer.AUTH_KEY.get()); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + catch (ForbiddenException e) { + // This block mimics the Servlet pattern of throwing ForbiddenException for + // all access denied cases rather than handling permissions in each message + // handler. + responseObserver.onError(new StatusRuntimeException(Status.PERMISSION_DENIED)); + } + } +} diff --git a/extensions-contrib/grpc-query/src/main/proto/health.proto b/extensions-contrib/grpc-query/src/main/proto/health.proto new file mode 100644 index 00000000000..304a2e8bd21 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/proto/health.proto @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +option java_package = "org.apache.druid.grpc.proto"; + +package druidGrpc; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +service Health { + // returns health of the server + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + // streaming health check api. The server will immediately send back a message indicating + // the current serving status. It will then subsequently send a new message whenever the + // service's serving status changes. + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} diff --git a/extensions-contrib/grpc-query/src/main/proto/query.proto b/extensions-contrib/grpc-query/src/main/proto/query.proto new file mode 100644 index 00000000000..bd14e222ac7 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/proto/query.proto @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +option java_package = "org.apache.druid.grpc.proto"; + +import "google/protobuf/timestamp.proto"; + +package druidGrpc; + +service Query { + rpc SubmitQuery (QueryRequest) returns (QueryResponse) {} +} + +enum QueryType { + SQL = 0; + NATIVE = 1; +} + +// Defines the format of the query results. Must be one of the +// non-unknown formats. +enum QueryResultFormat { + UNKNOWN_FORMAT = 0; // Default value. An old server will see this value + // (and fail the request) if a new client passes a new format. + CSV = 1; + // JSON_OBJECT = 2; -- Not yet + JSON_ARRAY = 3; + // JSON_OBJECT_LINES = 4; -- Not yet + JSON_ARRAY_LINES = 5; + + // The protobuf formats also require that the protobufMessageName be set in + // the query request. + PROTOBUF_INLINE = 6; + PROTOBUF_RESPONSE = 7; +} + +// Value for a query parameter. The value is essentially a variant of the +// supported parameter types. The type chosen here must match (or be converable +// to) the type of the corresponding expression in the SQL statement. +message QueryParameter { + oneof value { + bool nullValue = 1; + string stringValue = 2; + sint64 longValue = 3; + double doubleValue = 4; + StringArray arrayValue = 5; + } +} + +// Query pararameter value for string array properties. At present, string +// arrays are primarily used by the MSQ engine, which is not yet available +// via the gRPC API. +message StringArray { + repeated string value = 1; +} + +message QueryRequest { + string query = 1; + QueryResultFormat resultFormat = 2; + map context = 3; + // Query parameters. If your query is SELECT * FROM foo WHERE x = ? AND y > ? + // Then you would include two parameters in the order in which the question + // marks lexically appear in the query. + repeated QueryParameter parameters = 4; + // The name of the Protobuf message to encode the response if the + // resultFormat is one of the PROTOBUF formats. + optional string protobufMessageName = 5; + + // used only for native query + // columns to skip writing in the result, for example, it can used to skip writing + // time field in the result for timeseries query + repeated string skipColumns = 6; + + // used only for native query + // columns which should be converted to Timestamp + repeated string timeColumns = 7; + + QueryType queryType = 8; +} + +// Unauthorized errors return as a StatusRuntimeException with +// getStatus().getCode() == Status.Code.PERMISSION_DENIED +enum QueryStatus { + UNKNOWN_STATUS = 0; // Default value which means "unknown failure". Older clients + // will see this value if a future version adds a new failure + // type. + OK = 1; + REQUEST_ERROR = 3; + INVALID_SQL = 4; + RUNTIME_ERROR = 5; +} + +enum DruidType { + UNKNOWN_TYPE = 0; // Default value for unknown, or for future new values + // as seen by old clients. + STRING = 1; + LONG = 2; + DOUBLE = 3; + FLOAT = 4; + STRING_ARRAY = 5; + LONG_ARRAY = 6; + DOUBLE_ARRAY = 7; + FLOAT_ARRAY = 8; + COMPLEX = 9; +} + +message ColumnSchema { + string name = 1; + string sqlType = 2; + DruidType druidType = 3; +} + +message QueryResponse { + string queryId = 1; + QueryStatus status = 2; + + // Error message if the query fails. Not set if the query succeeds. + optional string errorMessage = 3; + + // The schema of the returned results. This schema is redundant for the + // JSON and Protobuf formats. It can be used to generate column heads, + // and understand types, for the CSV result format. + repeated ColumnSchema columns = 4; + + // The query response, encoded using the requested response format. + // Note that the entire response is placed into a single messages. As + // a result, this RPC is intended ONLY for queries that return small + // result sets. It will perform poorly (and consume excess memory) if + // used for large result sets. + optional bytes data = 5; +} diff --git a/extensions-contrib/grpc-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/grpc-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 00000000000..e35e9a02699 --- /dev/null +++ b/extensions-contrib/grpc-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.grpc.guice.GrpcQueryModule diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/BasicAuthTest.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/BasicAuthTest.java new file mode 100644 index 00000000000..d7d01843234 --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/BasicAuthTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import com.google.common.collect.ImmutableMap; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.apache.druid.grpc.proto.QueryOuterClass; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryRequest; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResultFormat; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryStatus; +import org.apache.druid.grpc.server.GrpcQueryConfig; +import org.apache.druid.grpc.server.QueryDriver; +import org.apache.druid.grpc.server.QueryServer; +import org.apache.druid.metadata.DefaultPasswordProvider; +import org.apache.druid.security.basic.authentication.BasicHTTPAuthenticator; +import org.apache.druid.security.basic.authentication.validator.CredentialsValidator; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +/** + * Simple test that runs the gRPC server, on top of a test SQL stack. + * Uses a simple client to send a query to the server. This is a basic + * sanity check of the gRPC stack. Uses allow-all security, which + * does a sanity check of the auth chain. + */ +public class BasicAuthTest extends BaseCalciteQueryTest +{ + private static QueryServer server; + + @BeforeEach + public void setup() throws IOException + { + SqlTestFramework sqlTestFramework = queryFramework(); + SqlTestFramework.PlannerFixture plannerFixture = sqlTestFramework.plannerFixture( + BaseCalciteQueryTest.PLANNER_CONFIG_DEFAULT, + new AuthConfig() + ); + QueryDriver driver = new QueryDriver( + sqlTestFramework.queryJsonMapper(), + plannerFixture.statementFactory(), + sqlTestFramework.queryLifecycleFactory() + ); + + CredentialsValidator validator = new CredentialsValidator() + { + @Override + public AuthenticationResult validateCredentials(String authenticatorName, String authorizerName, + String username, char[] password) + { + if (CalciteTests.TEST_SUPERUSER_NAME.equals(username)) { + if (!"secret".equals(new String(password))) { + return null; + } + return CalciteTests.SUPER_USER_AUTH_RESULT; + } + if ("regular".equals(username)) { + if (!"pwd".equals(new String(password))) { + return null; + } + return CalciteTests.REGULAR_USER_AUTH_RESULT; + } + return null; + } + }; + BasicHTTPAuthenticator basicAuth = new BasicHTTPAuthenticator( + null, + "test", + "test", + new DefaultPasswordProvider("druid"), + new DefaultPasswordProvider("druid"), + null, + null, + null, + false, + validator + ); + AuthenticatorMapper authMapper = new AuthenticatorMapper( + ImmutableMap.of( + "test", + basicAuth + ) + ); + GrpcQueryConfig config = new GrpcQueryConfig(50051); + server = new QueryServer(config, driver, authMapper); + try { + server.start(); + } + catch (IOException e) { + e.printStackTrace(); + throw e; + } + catch (RuntimeException e) { + e.printStackTrace(); + throw e; + } + } + + @AfterEach + public void tearDown() throws InterruptedException + { + if (server != null) { + server.stop(); + server.blockUntilShutdown(); + } + } + + @Test + public void testMissingAuth() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT * FROM foo") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + try (TestClient client = new TestClient(TestClient.DEFAULT_HOST)) { + StatusRuntimeException e = assertThrows(StatusRuntimeException.class, () -> client.getQueryClient().submitQuery(request)); + assertEquals(Status.PERMISSION_DENIED, e.getStatus()); + } + } + + @Test + public void testInvalidUser() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT * FROM foo") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + try (TestClient client = new TestClient(TestClient.DEFAULT_HOST, "invalid", "pwd")) { + StatusRuntimeException e = assertThrows(StatusRuntimeException.class, () -> client.getQueryClient().submitQuery(request)); + assertEquals(Status.PERMISSION_DENIED, e.getStatus()); + } + } + + @Test + public void testInvalidPassword() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT * FROM foo") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + try (TestClient client = new TestClient(TestClient.DEFAULT_HOST, CalciteTests.TEST_SUPERUSER_NAME, "invalid")) { + StatusRuntimeException e = assertThrows(StatusRuntimeException.class, () -> client.getQueryClient().submitQuery(request)); + assertEquals(Status.PERMISSION_DENIED, e.getStatus()); + } + } + + @Test + public void testValidUser() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT * FROM foo") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + try (TestClient client = new TestClient(TestClient.DEFAULT_HOST, CalciteTests.TEST_SUPERUSER_NAME, "secret")) { + QueryResponse response = client.getQueryClient().submitQuery(request); + assertEquals(QueryStatus.OK, response.getStatus()); + } + try (TestClient client = new TestClient(TestClient.DEFAULT_HOST, "regular", "pwd")) { + QueryResponse response = client.getQueryClient().submitQuery(request); + assertEquals(QueryStatus.OK, response.getStatus()); + } + } + + @Test + public void testUnauthorized() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT * FROM forbiddenDatasource") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + try (TestClient client = new TestClient(TestClient.DEFAULT_HOST, "regular", "pwd")) { + StatusRuntimeException e = assertThrows(StatusRuntimeException.class, () -> client.getQueryClient().submitQuery(request)); + assertEquals(Status.PERMISSION_DENIED, e.getStatus()); + } + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/BasicCredentials.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/BasicCredentials.java new file mode 100644 index 00000000000..66f0501acbb --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/BasicCredentials.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import io.grpc.CallCredentials; +import io.grpc.Metadata; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.concurrent.Executor; + +/** + * Applies Basic Auth credentials to an outgoing request. Use this + * class to set the Basic Auth user name and password on a gRPC client: + *


+ * QueryBlockingStub client = QueryGrpc.newBlockingStub(channel)
+ *     .withCallCredentials( new BasicCredentials(user, password));
+ * 
+ */ +public class BasicCredentials extends CallCredentials +{ + public static final String AUTHORIZATION_HEADER = "Authorization"; + private static final Metadata.Key AUTH_KEY = Metadata.Key.of(AUTHORIZATION_HEADER, Metadata.ASCII_STRING_MARSHALLER); + + public final String user; + public final String password; + + public BasicCredentials(String user, String password) + { + this.user = user; + this.password = password; + } + + @Override + public void applyRequestMetadata(RequestInfo requestInfo, Executor exec, MetadataApplier applier) + { + Metadata metadata = new Metadata(); + metadata.put(AUTH_KEY, getBasicAuthenticationHeader(user, password)); + applier.apply(metadata); + } + + // Source: https://www.baeldung.com/java-httpclient-basic-auth#authenticate-using-http-headers + private static String getBasicAuthenticationHeader(String username, String password) + { + String valueToEncode = username + ":" + password; + return "Basic " + Base64.getEncoder().encodeToString(valueToEncode.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void thisUsesUnstableApi() + { + // We've been warned. + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/DriverTest.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/DriverTest.java new file mode 100644 index 00000000000..0c7a6506b51 --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/DriverTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import org.apache.druid.grpc.proto.QueryOuterClass; +import org.apache.druid.grpc.proto.QueryOuterClass.ColumnSchema; +import org.apache.druid.grpc.proto.QueryOuterClass.DruidType; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryRequest; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResultFormat; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryStatus; +import org.apache.druid.grpc.server.QueryDriver; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DriverTest extends BaseCalciteQueryTest +{ + private QueryDriver driver; + + @BeforeEach + public void setup() + { + SqlTestFramework sqlTestFramework = queryFramework(); + SqlTestFramework.PlannerFixture plannerFixture = sqlTestFramework.plannerFixture( + BaseCalciteQueryTest.PLANNER_CONFIG_DEFAULT, + new AuthConfig() + ); + driver = new QueryDriver( + sqlTestFramework.queryJsonMapper(), + plannerFixture.statementFactory(), + sqlTestFramework.queryLifecycleFactory() + ); + } + + @Test + public void testBasics_sql() + { + String sql = "SELECT __time, dim2 FROM foo"; + QueryRequest request = QueryRequest.newBuilder() + .setQuery(sql) + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + QueryResponse response = driver.submitQuery(request, CalciteTests.REGULAR_USER_AUTH_RESULT); + + assertEquals(QueryStatus.OK, response.getStatus()); + assertFalse(response.hasErrorMessage()); + assertTrue(response.getQueryId().length() > 5); + List columns = response.getColumnsList(); + assertEquals(2, columns.size()); + ColumnSchema col = columns.get(0); + assertEquals("__time", col.getName()); + assertEquals("TIMESTAMP", col.getSqlType()); + assertEquals(DruidType.LONG, col.getDruidType()); + col = columns.get(1); + assertEquals("dim2", col.getName()); + assertEquals("VARCHAR", col.getSqlType()); + assertEquals(DruidType.STRING, col.getDruidType()); + + List expectedResults = Arrays.asList( + "2000-01-01T00:00:00.000Z,a", + "2000-01-02T00:00:00.000Z,", + "2000-01-03T00:00:00.000Z,", + "2001-01-01T00:00:00.000Z,a", + "2001-01-02T00:00:00.000Z,abc", + "2001-01-03T00:00:00.000Z," + ); + String results = response.getData().toStringUtf8(); + assertEquals(expectedResults, Arrays.asList(results.split("\n"))); + } + + @Test + public void testBasics_native_groupby() + { + String query = "{\n" + + " \"queryType\": \"groupBy\",\n" + + " \"dataSource\": \"wikipedia\",\n" + + " \"dimensions\": [\n" + + " {\n" + + " \"type\": \"default\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"outputType\": \"STRING\"\n" + + " }\n" + + " ],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"longSum\",\n" + + " \"name\": \"sum\",\n" + + " \"fieldName\": \"added\"\n" + + " }\n" + + " ],\n" + + " \"filter\": {\n" + + " \"type\": \"in\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"values\": [\n" + + " \"Albania\",\n" + + " \"Angola\"\n" + + " ]\n" + + " }," + + " \"intervals\": [\n" + + " \"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"\n" + + " ],\n" + + " \"granularity\": \"day\"\n" + + "}"; + + QueryRequest request = QueryRequest.newBuilder() + .setQuery(query) + .setResultFormat(QueryResultFormat.CSV) + .addTimeColumns("__time") + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .build(); + QueryResponse response = driver.submitQuery(request, CalciteTests.REGULAR_USER_AUTH_RESULT); + + assertEquals(QueryStatus.OK, response.getStatus()); + assertFalse(response.hasErrorMessage()); + assertTrue(response.getQueryId().length() > 5); + List columns = response.getColumnsList(); + assertEquals(3, columns.size()); + ColumnSchema col = columns.get(0); + assertEquals("__time", col.getName()); + assertEquals("", col.getSqlType()); + assertEquals(DruidType.LONG, col.getDruidType()); + col = columns.get(1); + assertEquals("countryName", col.getName()); + assertEquals("", col.getSqlType()); + assertEquals(DruidType.STRING, col.getDruidType()); + + col = columns.get(2); + assertEquals("sum", col.getName()); + assertEquals("", col.getSqlType()); + assertEquals(DruidType.LONG, col.getDruidType()); + + List expectedResults = Arrays.asList( + "2015-09-12T00:00:00.000Z,Albania,80", + "2015-09-12T00:00:00.000Z,Angola,784" + ); + String results = response.getData().toStringUtf8(); + assertEquals(expectedResults, Arrays.asList(results.split("\n"))); + } + + @Test + public void testBasics_native_timeseries() + { + String query = "{\n" + + " \"queryType\": \"timeseries\",\n" + + " \"dataSource\": \"foo\",\n" + + " \"granularity\": \"day\",\n" + + " \"descending\": \"true\",\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"longSum\",\n" + + " \"name\": \"timeseries\",\n" + + " \"fieldName\": \"m2\"\n" + + " }\n" + + " ],\n" + + " \"intervals\": [\n" + + " \"2000-01-01T00:00:00.000/2000-01-04T00:00:00.000\"\n" + + " ]\n" + + "}"; + + QueryRequest request = QueryRequest.newBuilder() + .setQuery(query) + .setResultFormat(QueryResultFormat.CSV) + .addTimeColumns("__time") + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .build(); + QueryResponse response = driver.submitQuery(request, CalciteTests.REGULAR_USER_AUTH_RESULT); + + assertEquals(QueryStatus.OK, response.getStatus()); + + assertEquals(QueryStatus.OK, response.getStatus()); + assertFalse(response.hasErrorMessage()); + assertTrue(response.getQueryId().length() > 5); + List columns = response.getColumnsList(); + assertEquals(2, columns.size()); + ColumnSchema col = columns.get(0); + assertEquals("__time", col.getName()); + assertEquals("", col.getSqlType()); + assertEquals(DruidType.LONG, col.getDruidType()); + col = columns.get(1); + assertEquals("timeseries", col.getName()); + assertEquals("", col.getSqlType()); + assertEquals(DruidType.LONG, col.getDruidType()); + + List expectedResults = Arrays.asList( + "2000-01-03T00:00:00.000Z,3", + "2000-01-02T00:00:00.000Z,2", + "2000-01-01T00:00:00.000Z,1" + ); + String results = response.getData().toStringUtf8(); + assertEquals(expectedResults, Arrays.asList(results.split("\n"))); + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/GrpcQueryTest.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/GrpcQueryTest.java new file mode 100644 index 00000000000..8b0129b0e42 --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/GrpcQueryTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import com.google.common.collect.ImmutableMap; +import io.grpc.StatusRuntimeException; +import org.apache.druid.grpc.client.GrpcResponseHandler; +import org.apache.druid.grpc.proto.HealthOuterClass.HealthCheckRequest; +import org.apache.druid.grpc.proto.HealthOuterClass.HealthCheckResponse; +import org.apache.druid.grpc.proto.QueryOuterClass; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryRequest; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResultFormat; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryStatus; +import org.apache.druid.grpc.proto.TestResults; +import org.apache.druid.grpc.proto.TestResults.QueryResult; +import org.apache.druid.grpc.server.GrpcQueryConfig; +import org.apache.druid.grpc.server.QueryDriver; +import org.apache.druid.grpc.server.QueryServer; +import org.apache.druid.server.security.AllowAllAuthenticator; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * Simple test that runs the gRPC server, on top of a test SQL stack. + * Uses a simple client to send a query to the server. This is a basic + * sanity check of the gRPC stack. Uses allow-all security, which + * does a sanity check of the auth chain. + */ +public class GrpcQueryTest extends BaseCalciteQueryTest +{ + private static QueryServer server; + private static TestClient client; + + @BeforeEach + public void setup() throws IOException + { + SqlTestFramework sqlTestFramework = queryFramework(); + SqlTestFramework.PlannerFixture plannerFixture = sqlTestFramework.plannerFixture( + BaseCalciteQueryTest.PLANNER_CONFIG_DEFAULT, + new AuthConfig() + ); + + QueryDriver driver = new QueryDriver( + sqlTestFramework.queryJsonMapper(), + plannerFixture.statementFactory(), + sqlTestFramework.queryLifecycleFactory() + ); + AuthenticatorMapper authMapper = new AuthenticatorMapper( + ImmutableMap.of( + "test", + new AllowAllAuthenticator() + ) + ); + GrpcQueryConfig config = new GrpcQueryConfig(50051); + server = new QueryServer(config, driver, authMapper); + try { + server.start(); + } + catch (IOException | RuntimeException e) { + e.printStackTrace(); + throw e; + } + client = new TestClient(TestClient.DEFAULT_HOST); + } + + @AfterEach + public void tearDown() throws InterruptedException + { + if (client != null) { + client.close(); + } + if (server != null) { + server.stop(); + server.blockUntilShutdown(); + } + } + + /** + * Do a very basic query. + */ + @Test + public void testBasics_sql() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT * FROM foo") + .setResultFormat(QueryResultFormat.CSV) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + assertEquals(QueryStatus.OK, response.getStatus()); + } + + /** + * Do a very basic query that outputs protobuf. + */ + @Test + public void testGrpcBasics_sql() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT dim1, dim2, dim3, cnt, m1, m2, unique_dim1, __time AS \"date\" FROM foo") + .setQueryType(QueryOuterClass.QueryType.SQL) + .setProtobufMessageName(QueryResult.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + GrpcResponseHandler handler = GrpcResponseHandler.of(QueryResult.class); + List queryResults = handler.get(response.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(6, queryResults.size()); + } + + @Test + public void testGrpcBasics_native_timeseries() + { + String query = "{\n" + + " \"queryType\": \"timeseries\",\n" + + " \"dataSource\": \"foo\",\n" + + " \"granularity\": \"day\",\n" + + " \"descending\": \"true\",\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"longSum\",\n" + + " \"name\": \"timeseries\",\n" + + " \"fieldName\": \"m2\"\n" + + " }\n" + + " ],\n" + + " \"intervals\": [\n" + + " \"2000-01-01T00:00:00.000/2000-01-05T00:00:00.000\"\n" + + " ]\n" + + "}"; + + QueryRequest request = QueryRequest.newBuilder() + .setQuery(query) + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .addTimeColumns("__time") + .setProtobufMessageName(TestResults.NativeQueryResultTimeSeries.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + GrpcResponseHandler handler = GrpcResponseHandler.of(TestResults.NativeQueryResultTimeSeries.class); + List queryResults = handler.get(response.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(4, queryResults.size()); + + QueryRequest requestSkipTime = QueryRequest.newBuilder() + .setQuery(query) + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .addSkipColumns("__time") + .setProtobufMessageName(TestResults.NativeQueryResultTimeSeriesSkipTime.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + + QueryResponse responseSkipTime = client.getQueryClient().submitQuery(requestSkipTime); + GrpcResponseHandler handlerSkipTime = GrpcResponseHandler.of(TestResults.NativeQueryResultTimeSeriesSkipTime.class); + List queryResultsSkipTime = handlerSkipTime.get(responseSkipTime.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(4, queryResultsSkipTime.size()); + } + + @Test + public void testGrpcBasics_native_groupby_day_granularity() + { + String query = "{\n" + + " \"queryType\": \"groupBy\",\n" + + " \"dataSource\": \"wikipedia\",\n" + + " \"dimensions\": [\n" + + " {\n" + + " \"type\": \"default\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"outputType\": \"STRING\"\n" + + " }\n" + + " ],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"longSum\",\n" + + " \"name\": \"aggregate\",\n" + + " \"fieldName\": \"added\"\n" + + " }\n" + + " ],\n" + + " \"filter\": {\n" + + " \"type\": \"in\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"values\": [\n" + + " \"Albania\",\n" + + " \"Angola\"\n" + + " ]\n" + + " }," + + " \"intervals\": [\n" + + " \"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"\n" + + " ],\n" + + " \"granularity\": \"day\"\n" + + "}"; + + QueryRequest request = QueryRequest.newBuilder() + .setQuery(query) + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .putContext("timeFieldKey", "date") + .addTimeColumns("__time") + .setProtobufMessageName(TestResults.NativeQueryResultGroupby.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + GrpcResponseHandler handler = GrpcResponseHandler.of(TestResults.NativeQueryResultGroupby.class); + List queryResults = handler.get(response.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(2, queryResults.size()); + + } + + @Test + public void testGrpcBasics_native_groupby() + { + String query = "{\n" + + " \"queryType\": \"groupBy\",\n" + + " \"dataSource\": \"wikipedia\",\n" + + " \"dimensions\": [\n" + + " {\n" + + " \"type\": \"default\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"outputType\": \"STRING\"\n" + + " }\n" + + " ],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"longSum\",\n" + + " \"name\": \"aggregate\",\n" + + " \"fieldName\": \"added\"\n" + + " }\n" + + " ],\n" + + " \"filter\": {\n" + + " \"type\": \"in\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"values\": [\n" + + " \"Albania\",\n" + + " \"Angola\"\n" + + " ]\n" + + " }," + + " \"intervals\": [\n" + + " \"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"\n" + + " ],\n" + + " \"granularity\": \"all\"\n" + + "}"; + + QueryRequest request = QueryRequest.newBuilder() + .setQuery(query) + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .setProtobufMessageName(TestResults.NativeQueryResultGroupbyWithoutTime.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + GrpcResponseHandler handler = GrpcResponseHandler.of(TestResults.NativeQueryResultGroupbyWithoutTime.class); + List queryResults = handler.get(response.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(2, queryResults.size()); + } + + @Test + public void testGrpcBasics_native_groupby_timeaggregate() + { + String query = "{\n" + + " \"queryType\": \"groupBy\",\n" + + " \"dataSource\": \"wikipedia\",\n" + + " \"dimensions\": [\n" + + " {\n" + + " \"type\": \"default\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"outputType\": \"STRING\"\n" + + " },\n" + + " {\n" + + " \"type\":\"default\",\n" + + " \"dimension\": \"__time\",\n" + + " \"outputName\": \"timeCol\",\n" + + " \"outputType\": \"LONG\"\n" + + " }" + + " ],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"longSum\",\n" + + " \"name\": \"aggregate\",\n" + + " \"fieldName\": \"added\"\n" + + " }\n" + + " ],\n" + + " \"filter\": {\n" + + " \"type\": \"in\",\n" + + " \"dimension\": \"countryName\",\n" + + " \"values\": [\n" + + " \"Albania\",\n" + + " \"Angola\"\n" + + " ]\n" + + " }," + + " \"intervals\": [\n" + + " \"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"\n" + + " ],\n" + + " \"granularity\": \"day\"\n" + + "}"; + + QueryRequest request = QueryRequest.newBuilder() + .setQuery(query) + .setQueryType(QueryOuterClass.QueryType.NATIVE) + .addSkipColumns("__time") + .addTimeColumns("timeCol") + .setProtobufMessageName(TestResults.NativeQueryResultGroupbyTimeRenamed.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + GrpcResponseHandler handler = GrpcResponseHandler.of(TestResults.NativeQueryResultGroupbyTimeRenamed.class); + List queryResults = handler.get(response.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(6, queryResults.size()); + } + + @Test + public void testGrpcEmptyResponse_sql() + { + QueryRequest request = QueryRequest.newBuilder() + .setQuery("SELECT dim1, dim2, dim3, cnt, m1, m2, unique_dim1, __time AS \"date\" FROM foo where cnt = 100000") + .setProtobufMessageName(QueryResult.class.getName()) + .setResultFormat(QueryResultFormat.PROTOBUF_INLINE) + .setQueryType(QueryOuterClass.QueryType.SQL) + .build(); + + QueryResponse response = client.getQueryClient().submitQuery(request); + GrpcResponseHandler handler = GrpcResponseHandler.of(QueryResult.class); + List queryResults = handler.get(response.getData()); + assertEquals(QueryStatus.OK, response.getStatus()); + assertEquals(0, queryResults.size()); + } + + @Test + public void test_health_check() + { + HealthCheckRequest healthCheckRequest = HealthCheckRequest + .newBuilder() + .setService("QueryService") + .build(); + + HealthCheckResponse healthCheckResponse = client + .getHealthCheckClient() + .check(healthCheckRequest); + + assertEquals(HealthCheckResponse.ServingStatus.SERVING, healthCheckResponse.getStatus()); + + healthCheckRequest = HealthCheckRequest + .newBuilder() + .setService("") + .build(); + + healthCheckResponse = client + .getHealthCheckClient() + .check(healthCheckRequest); + + assertEquals(HealthCheckResponse.ServingStatus.SERVING, healthCheckResponse.getStatus()); + } + + @Test + public void test_health_watch() + { + HealthCheckRequest healthCheckRequest = HealthCheckRequest + .newBuilder() + .setService("QueryService") + .build(); + + Iterator streamingHealthCheckResponse = client + .getHealthCheckClient() + .watch(healthCheckRequest); + + assertTrue(streamingHealthCheckResponse.hasNext()); + assertEquals(HealthCheckResponse.ServingStatus.SERVING, streamingHealthCheckResponse.next().getStatus()); + + Executors.newSingleThreadExecutor().submit(() -> { + Iterator secondRequest = client + .getHealthCheckClient() + .watch(healthCheckRequest); + + assertThrows(StatusRuntimeException.class, secondRequest::hasNext); + }); + + // stop the service from another thread + Executors.newSingleThreadExecutor().submit(() -> { + try { + Thread.sleep(10_000); + server.stop(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // hasNext call would block until the status has changed + // as soon as the server is stopped the status would change to NOT_SERVING + if (streamingHealthCheckResponse.hasNext()) { + assertEquals(HealthCheckResponse.ServingStatus.NOT_SERVING, streamingHealthCheckResponse.next().getStatus()); + } + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/GrpcQueryTestClient.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/GrpcQueryTestClient.java new file mode 100644 index 00000000000..aecded9f4b6 --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/GrpcQueryTestClient.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import org.apache.druid.grpc.proto.QueryOuterClass.ColumnSchema; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryRequest; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResultFormat; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryStatus; +import org.apache.druid.java.util.common.StringUtils; + +import java.nio.charset.StandardCharsets; + +/** + * Super-simple command-line (or IDE launched) client which makes a + * single query request and prints + * the response. Useful because Druid provides no other rRPC client + * to use to test the rRPC endpoint. Pass the desired query as the + * one and only command line parameter. Does not (yet) support the + * query context or query parameters. + */ +public class GrpcQueryTestClient +{ + public static void main(String[] args) + { + if (args.length != 1) { + System.err.println("Usage: sql-query"); + System.exit(1); + } + TestClient client = new TestClient(TestClient.DEFAULT_HOST); + QueryRequest request = QueryRequest.newBuilder() + .setQuery(args[0]) + .setResultFormat(QueryResultFormat.CSV) + .build(); + QueryResponse response = client.getQueryClient().submitQuery(request); + + + if (response.getStatus() != QueryStatus.OK) { + System.err.println("Failed: " + response.getStatus().name()); + System.err.println(response.getErrorMessage()); + System.exit(1); + } + System.out.println("Columns:"); + for (ColumnSchema col : response.getColumnsList()) { + System.out.println(StringUtils.format("%s %s (%s)", col.getName(), col.getSqlType(), col.getDruidType().name())); + } + System.out.println("Data:"); + System.out.println(response.getData().toString(StandardCharsets.UTF_8)); + client.close(); + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/TestClient.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/TestClient.java new file mode 100644 index 00000000000..96b9cf104ed --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/TestClient.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import io.grpc.CallCredentials; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import org.apache.druid.grpc.proto.HealthGrpc; +import org.apache.druid.grpc.proto.QueryGrpc; +import org.apache.druid.grpc.proto.QueryGrpc.QueryBlockingStub; + +import java.util.concurrent.TimeUnit; + +/** + * Super-simple test client that connects to a gRPC query endpoint + * and allows submitting a rRPC query request and returns the response. + * The server can be in the same or another process. + */ +public class TestClient implements AutoCloseable +{ + public static final String DEFAULT_HOST = "localhost:50051"; + private final ManagedChannel channel; + private QueryBlockingStub queryClient; + private HealthGrpc.HealthBlockingStub healthCheckClient; + + public TestClient(String target) + { + // Access a service running on the local machine on port 50051 + this(target, null); + } + + public TestClient(String target, String user, String password) + { + this(target, new BasicCredentials(user, password)); + } + + public TestClient(String target, CallCredentials callCreds) + { + // Create a communication channel to the server, known as a Channel. Channels are thread-safe + // and reusable. It is common to create channels at the beginning of your application and reuse + // them until the application shuts down. + // + // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To + // use TLS, use TlsChannelCredentials instead. + channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + queryClient = QueryGrpc.newBlockingStub(channel); + healthCheckClient = HealthGrpc.newBlockingStub(channel); + if (callCreds != null) { + queryClient = queryClient.withCallCredentials(callCreds); + } + } + + public QueryBlockingStub getQueryClient() + { + return queryClient; + } + + public HealthGrpc.HealthBlockingStub getHealthCheckClient() + { + return healthCheckClient; + } + + public QueryBlockingStub client() + { + return queryClient; + } + + @Override + public void close() + { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + try { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + // Ignore + } + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/TestServer.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/TestServer.java new file mode 100644 index 00000000000..b25d2e2ea53 --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/TestServer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.grpc.server.GrpcEndpointInitializer; +import org.apache.druid.grpc.server.GrpcQueryConfig; +import org.apache.druid.server.security.AllowAllAuthenticator; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +/** + * Super-simple test server that uses the CalciteTests setup. + */ +public class TestServer extends BaseCalciteQueryTest +{ + private GrpcEndpointInitializer serverInit; + + @Test + @Disabled + public void run() + { + SqlTestFramework sqlTestFramework = queryFramework(); + SqlTestFramework.PlannerFixture plannerFixture = sqlTestFramework.plannerFixture( + BaseCalciteQueryTest.PLANNER_CONFIG_DEFAULT, + new AuthConfig() + ); + GrpcQueryConfig config = new GrpcQueryConfig(50051); + AuthenticatorMapper authMapper = new AuthenticatorMapper( + ImmutableMap.of( + "test", + new AllowAllAuthenticator() + ) + ); + serverInit = new GrpcEndpointInitializer( + config, + sqlTestFramework.queryJsonMapper(), + plannerFixture.statementFactory(), + null, + authMapper + ); + serverInit.start(); + Runtime.getRuntime().addShutdownHook(new Thread() + { + @Override + public void run() + { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + serverInit.stop(); + } + }); + } +} diff --git a/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/client/GrpcResponseHandlerTest.java b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/client/GrpcResponseHandlerTest.java new file mode 100644 index 00000000000..1f3ba79814c --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/java/org/apache/druid/grpc/client/GrpcResponseHandlerTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.grpc.client; + +import com.google.protobuf.ByteString; +import org.apache.druid.grpc.proto.QueryOuterClass.QueryResponse; +import org.apache.druid.grpc.proto.TestResults.QueryResult; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class GrpcResponseHandlerTest +{ + private static List EXPECTED_RESULTS = Arrays.asList( + QueryResult.newBuilder().setDim1("test").setCnt(100).build(), + QueryResult.newBuilder().setDim2("test2").setCnt(100).setM2(200.10).build() + ); + + @Test + public void testEmptyResponse() + { + GrpcResponseHandler handler = GrpcResponseHandler.of(QueryResult.class); + List queryResults = handler.get(ByteString.EMPTY); + assertTrue(queryResults.isEmpty()); + } + + @Test + public void testNonEmptyResponse() + { + GrpcResponseHandler handler = GrpcResponseHandler.of(QueryResult.class); + QueryResponse queryResponse = getQueryResponse(); + List queryResults = handler.get(queryResponse.getData()); + assertEquals(2, queryResults.size()); + assertEquals(EXPECTED_RESULTS, queryResults); + } + + private static QueryResponse getQueryResponse() + { + try { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (QueryResult queryResult : EXPECTED_RESULTS) { + queryResult.writeDelimitedTo(out); + } + return QueryResponse.newBuilder() + .setData(ByteString.copyFrom(out.toByteArray())) + .build(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions-contrib/grpc-query/src/test/proto/all_types.proto b/extensions-contrib/grpc-query/src/test/proto/all_types.proto new file mode 100644 index 00000000000..e5438b06c2e --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/proto/all_types.proto @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package org.apache.druid.grpc.results; +option java_package = "org.apache.druid.grpc.proto"; + +message AllTypesQueryResult { + google.protobuf.Timestamp time_value = 1; + string string_value = 2; + int64 long_value = 3; + // Use of a 'float' type will cause a runtime error. SQL 'FLOAT' + // types are actually stored as 'double' internally. + double float_value = 4; + double double_value = 5; +} diff --git a/extensions-contrib/grpc-query/src/test/proto/test_results.proto b/extensions-contrib/grpc-query/src/test/proto/test_results.proto new file mode 100644 index 00000000000..5f3ed16198c --- /dev/null +++ b/extensions-contrib/grpc-query/src/test/proto/test_results.proto @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package org.apache.druid.grpc.results; +option java_package = "org.apache.druid.grpc.proto"; + +message QueryResult { + string dim1 = 1; + string dim2 = 2; + string dim3 = 3; + int64 cnt = 4; + float m1 = 5; + double m2 = 6; + bytes unique_dim1 = 7; + google.protobuf.Timestamp date = 8; +} + +message NativeQueryResultTimeSeries { + int64 timeseries = 1; + google.protobuf.Timestamp time = 2; +} + +message NativeQueryResultTimeSeriesSkipTime { + int64 timeseries = 1; +} + +message NativeQueryResultGroupby { + int64 aggregate = 1; + string countryName = 2; + google.protobuf.Timestamp date = 3; +} + +message NativeQueryResultGroupbyTimeRenamed { + int64 aggregate = 1; + string countryName = 2; + google.protobuf.Timestamp timeCol = 3; +} + +message NativeQueryResultGroupbyWithoutTime { + int64 aggregate = 1; + string countryName = 2; +} diff --git a/pom.xml b/pom.xml index 42f7333ea2c..2a5d6d90212 100644 --- a/pom.xml +++ b/pom.xml @@ -231,11 +231,11 @@ extensions-contrib/prometheus-emitter extensions-contrib/opentelemetry-emitter extensions-contrib/kubernetes-overlord-extensions + extensions-contrib/grpc-query extensions-contrib/druid-iceberg-extensions extensions-contrib/druid-deltalake-extensions extensions-contrib/spectator-histogram extensions-contrib/rabbit-stream-indexing-service - distribution