From 7aa8d7f9875cbe950696b5c94e375ac616dd2d04 Mon Sep 17 00:00:00 2001 From: Rohan Garg <7731512+rohangarg@users.noreply.github.com> Date: Wed, 7 Sep 2022 13:54:46 +0530 Subject: [PATCH] Add query/time metric for SQL queries from router (#12867) * Add query/time metric for SQL queries from router * Fix query cancel bug when user has overriden native query-id in a SQL query --- docs/operations/metrics.md | 5 + .../DefaultGenericQueryMetricsFactory.java | 5 + .../druid/query/DefaultQueryMetrics.java | 12 + .../query/GenericQueryMetricsFactory.java | 7 + .../org/apache/druid/query/QueryMetrics.java | 12 + .../search/DefaultSearchQueryMetrics.java | 12 + .../druid/query/DefaultQueryMetricsTest.java | 6 +- .../search/DefaultSearchQueryMetricsTest.java | 3 + .../apache/druid/server/QueryResource.java | 3 +- .../server/AsyncQueryForwardingServlet.java | 210 +++++++++++++++--- .../AsyncQueryForwardingServletTest.java | 102 ++++++++- .../sql/calcite/run/NativeQueryMaker.java | 2 + .../org/apache/druid/sql/http/SqlQuery.java | 13 ++ 13 files changed, 348 insertions(+), 44 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 8bc302e5982..6cc013664e2 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -39,6 +39,11 @@ Metrics may have additional dimensions beyond those listed above. ## Query metrics +### Router +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`query/time`|Milliseconds taken to complete a query.|Native Query: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id.|< 1s| + ### Broker |Metric|Description|Dimensions|Normal Value| diff --git a/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java b/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java index e0c5b6017b9..ec2662bad4f 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java @@ -46,4 +46,9 @@ public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFac return queryMetrics; } + @Override + public QueryMetrics> makeMetrics() + { + return new DefaultQueryMetrics<>(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index ac20cc248de..441b36d5bb5 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -132,6 +132,12 @@ public class DefaultQueryMetrics> implements QueryMet setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(query.getId())); } + @Override + public void queryId(String queryId) + { + setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(queryId)); + } + @Override public void subQueryId(QueryType query) { @@ -144,6 +150,12 @@ public class DefaultQueryMetrics> implements QueryMet // Emit nothing by default. } + @Override + public void sqlQueryId(String sqlQueryId) + { + // Emit nothing by default. + } + @Override public void context(QueryType query) { diff --git a/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java b/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java index ce9c4e19c48..91e1e29ce0d 100644 --- a/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java +++ b/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java @@ -46,4 +46,11 @@ public interface GenericQueryMetricsFactory * call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning. */ QueryMetrics> makeMetrics(Query query); + + /** + * Creates a {@link QueryMetrics} which doesn't have predefined QueryMetrics subclass. This method is used + * by the router to build a {@link QueryMetrics} for SQL queries. It is needed since at router, there is no native + * query linked to a SQL query. + */ + QueryMetrics> makeMetrics(); } diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index 4d2406c697b..f4d71060bf7 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -202,6 +202,12 @@ public interface QueryMetrics> @PublicApi void queryId(QueryType query); + /** + * Sets id of the given query as dimension. + */ + @PublicApi + void queryId(@SuppressWarnings("UnusedParameters") String queryId); + /** * Sets {@link Query#getSubQueryId()} of the given query as dimension. */ @@ -214,6 +220,12 @@ public interface QueryMetrics> @PublicApi void sqlQueryId(QueryType query); + /** + * Sets sqlQueryId as a dimension + */ + @PublicApi + void sqlQueryId(@SuppressWarnings("UnusedParameters") String sqlQueryId); + /** * Sets {@link Query#getContext()} of the given query as dimension. */ diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index 108518a7948..2481eeb35f2 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -87,6 +87,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics throw new ISE("Unsupported method in default query metrics implementation."); } + @Override + public void queryId(String queryId) + { + throw new ISE("Unsupported method in default query metrics implementation."); + } + @Override public void subQueryId(SearchQuery query) { @@ -99,6 +105,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics throw new ISE("Unsupported method in default query metrics implementation."); } + @Override + public void sqlQueryId(String sqlQueryId) + { + throw new ISE("Unsupported method in default query metrics implementation."); + } + @Override public void granularity(SearchQuery query) { diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index 7658ffd8416..d512a294ef8 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -67,6 +67,10 @@ public class DefaultQueryMetricsTest .build(); queryMetrics.query(query); queryMetrics.reportQueryTime(0).emit(serviceEmitter); + // No way to verify this right now since DefaultQueryMetrics implements a no-op for sqlQueryId(String) and queryId(String) + // This change is done to keep the code coverage tool happy by exercising the implementation + queryMetrics.sqlQueryId("dummy"); + queryMetrics.queryId("dummy"); Map actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); @@ -81,7 +85,7 @@ public class DefaultQueryMetricsTest Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL)); Assert.assertEquals("true", actualEvent.get("hasFilters")); Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration")); - Assert.assertEquals("", actualEvent.get(DruidMetrics.ID)); + Assert.assertEquals("dummy", actualEvent.get(DruidMetrics.ID)); Assert.assertEquals("query/time", actualEvent.get("metric")); Assert.assertEquals(0L, actualEvent.get("value")); Assert.assertEquals(ImmutableMap.of("testKey", "testValue"), actualEvent.get("context")); diff --git a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java index ae666e39022..a03781566b5 100644 --- a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java @@ -21,6 +21,7 @@ package org.apache.druid.query.search; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.CachingEmitter; import org.apache.druid.query.DefaultQueryMetricsTest; @@ -86,6 +87,8 @@ public class DefaultSearchQueryMetricsTest // Metric Assert.assertEquals("query/time", actualEvent.get("metric")); Assert.assertEquals(0L, actualEvent.get("value")); + + Assert.assertThrows(ISE.class, () -> queryMetrics.sqlQueryId("dummy")); } @Test diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index f2a55242ea2..590347735b8 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -98,6 +98,7 @@ public class QueryResource implements QueryCountStatsProvider */ public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context"; public static final String HEADER_IF_NONE_MATCH = "If-None-Match"; + public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id"; public static final String HEADER_ETAG = "ETag"; protected final QueryLifecycleFactory queryLifecycleFactory; @@ -252,7 +253,7 @@ public class QueryResource implements QueryCountStatsProvider }, ioReaderWriter.getResponseWriter().getResponseType() ) - .header("X-Druid-Query-Id", queryId); + .header(QUERY_ID_RESPONSE_HEADER, queryId); transferEntityTag(responseContext, responseBuilder); diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 3668543bf49..fa3b52669b6 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.Query; @@ -56,6 +57,7 @@ import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authenticator; import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.sql.http.SqlResource; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -65,12 +67,14 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.proxy.AsyncProxyServlet; +import javax.annotation.Nullable; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -266,11 +270,16 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu handleException(response, objectMapper, e); return; } - } else if (routeSqlByStrategy && isSqlQueryEndpoint && HttpMethod.POST.is(method)) { + } else if (isSqlQueryEndpoint && HttpMethod.POST.is(method)) { try { SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class); + inputSqlQuery = buildSqlQueryWithId(inputSqlQuery); request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery); - targetServer = hostFinder.findServerSql(inputSqlQuery); + if (routeSqlByStrategy) { + targetServer = hostFinder.findServerSql(inputSqlQuery); + } else { + targetServer = hostFinder.pickDefaultServer(); + } LOG.debug("Forwarding SQL query to broker [%s]", targetServer.getHost()); } catch (IOException e) { @@ -292,6 +301,22 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu doService(request, response); } + /** + * Rebuilds the {@link SqlQuery} object with sqlQueryId and queryId context parameters if not present + * @param sqlQuery the original SqlQuery + * @return an updated sqlQuery object with sqlQueryId and queryId context parameters + */ + private SqlQuery buildSqlQueryWithId(SqlQuery sqlQuery) + { + Map context = new HashMap<>(sqlQuery.getContext()); + String sqlQueryId = (String) context.getOrDefault(BaseQuery.SQL_QUERY_ID, UUID.randomUUID().toString()); + // set queryId to sqlQueryId if not overridden + String queryId = (String) context.getOrDefault(BaseQuery.QUERY_ID, sqlQueryId); + context.put(BaseQuery.SQL_QUERY_ID, sqlQueryId); + context.put(BaseQuery.QUERY_ID, queryId); + return sqlQuery.withOverridenContext(context); + } + /** * Issues async query cancellation requests to all Brokers (except the given * targetServer). Query cancellation on the targetServer is handled by the @@ -449,12 +474,15 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu @Override protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response) { - final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); - if (query != null) { - return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime()); - } else { - return super.newProxyResponseListener(request, response); - } + boolean isJDBC = request.getAttribute(AVATICA_QUERY_ATTRIBUTE) != null; + return newMetricsEmittingProxyResponseListener( + request, + response, + (Query) request.getAttribute(QUERY_ATTRIBUTE), + (SqlQuery) request.getAttribute(SQL_QUERY_ATTRIBUTE), + isJDBC, + System.nanoTime() + ); } @Override @@ -500,11 +528,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private Response.Listener newMetricsEmittingProxyResponseListener( HttpServletRequest request, HttpServletResponse response, - Query query, + @Nullable Query query, + @Nullable SqlQuery sqlQuery, + boolean isJDBC, long startNs ) { - return new MetricsEmittingProxyResponseListener(request, response, query, startNs); + return new MetricsEmittingProxyResponseListener(request, response, query, sqlQuery, isJDBC, startNs); } @Override @@ -660,22 +690,28 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private class MetricsEmittingProxyResponseListener extends ProxyResponseListener { private final HttpServletRequest req; - private final HttpServletResponse res; + @Nullable private final Query query; + @Nullable + private final SqlQuery sqlQuery; + private final boolean isJDBC; private final long startNs; public MetricsEmittingProxyResponseListener( HttpServletRequest request, HttpServletResponse response, - Query query, + @Nullable Query query, + @Nullable SqlQuery sqlQuery, + boolean isJDBC, long startNs ) { super(request, response); this.req = request; - this.res = response; this.query = query; + this.sqlQuery = sqlQuery; + this.isJDBC = isJDBC; this.startNs = startNs; } @@ -683,14 +719,63 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu public void onComplete(Result result) { final long requestTimeNs = System.nanoTime() - startNs; - try { - boolean success = result.isSucceeded(); - if (success) { - successfulQueryCount.incrementAndGet(); - } else { - failedQueryCount.incrementAndGet(); + String queryId = null; + String sqlQueryId = null; + if (isJDBC) { + sqlQueryId = result.getResponse().getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER); + } else if (sqlQuery != null) { + sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null); + queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null); + } else if (query != null) { + queryId = query.getId(); + } + + // not a native or SQL query, no need to emit metrics and logs + if (queryId == null && sqlQueryId == null) { + super.onComplete(result); + return; + } + + boolean success = result.isSucceeded(); + if (success) { + successfulQueryCount.incrementAndGet(); + } else { + failedQueryCount.incrementAndGet(); + } + emitQueryTime(requestTimeNs, success, sqlQueryId, queryId); + + //noinspection VariableNotUsedInsideIf + if (sqlQueryId != null) { + // SQL query doesn't have a native query translation in router. Hence, not logging the native query. + if (sqlQuery != null) { + try { + requestLogger.logSqlQuery( + RequestLogLine.forSql( + sqlQuery.getQuery(), + sqlQuery.getContext(), + DateTimes.nowUtc(), + req.getRemoteAddr(), + new QueryStats( + ImmutableMap.of( + "query/time", + TimeUnit.NANOSECONDS.toMillis(requestTimeNs), + "success", + success + && result.getResponse().getStatus() == Status.OK.getStatusCode() + ) + ) + ) + ); + } + catch (IOException e) { + LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery); + } } - emitQueryTime(requestTimeNs, success); + super.onComplete(result); + return; + } + + try { requestLogger.logNativeQuery( RequestLogLine.forNative( query, @@ -718,10 +803,64 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu @Override public void onFailure(Response response, Throwable failure) { + final long requestTimeNs = System.nanoTime() - startNs; + final String errorMessage = failure.getMessage(); + String queryId = null; + String sqlQueryId = null; + if (isJDBC) { + sqlQueryId = response.getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER); + } else if (sqlQuery != null) { + sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null); + queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null); + } else if (query != null) { + queryId = query.getId(); + } + + // not a native or SQL query, no need to emit metrics and logs + if (queryId == null && sqlQueryId == null) { + super.onFailure(response, failure); + return; + } + + failedQueryCount.incrementAndGet(); + emitQueryTime(requestTimeNs, false, sqlQueryId, queryId); + + //noinspection VariableNotUsedInsideIf + if (sqlQueryId != null) { + // SQL query doesn't have a native query translation in router. Hence, not logging the native query. + if (sqlQuery != null) { + try { + requestLogger.logSqlQuery( + RequestLogLine.forSql( + sqlQuery.getQuery(), + sqlQuery.getContext(), + DateTimes.nowUtc(), + req.getRemoteAddr(), + new QueryStats( + ImmutableMap.of( + "success", + false, + "exception", + errorMessage == null ? "no message" : errorMessage + ) + ) + ) + ); + } + catch (IOException e) { + LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery); + } + LOG.makeAlert(failure, "Exception handling request") + .addData("exception", failure.toString()) + .addData("sqlQuery", sqlQuery) + .addData("peer", req.getRemoteAddr()) + .emit(); + } + super.onFailure(response, failure); + return; + } + try { - final String errorMessage = failure.getMessage(); - failedQueryCount.incrementAndGet(); - emitQueryTime(System.nanoTime() - startNs, false); requestLogger.logNativeQuery( RequestLogLine.forNative( query, @@ -751,14 +890,25 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu super.onFailure(response, failure); } - private void emitQueryTime(long requestTimeNs, boolean success) + private void emitQueryTime(long requestTimeNs, boolean success, @Nullable String sqlQueryId, @Nullable String queryId) { - QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( - queryMetricsFactory, - warehouse.getToolChest(query), - query, - req.getRemoteAddr() - ); + QueryMetrics queryMetrics; + if (sqlQueryId != null) { + queryMetrics = queryMetricsFactory.makeMetrics(); + queryMetrics.remoteAddress(req.getRemoteAddr()); + // Setting sqlQueryId and queryId dimensions to the metric + queryMetrics.sqlQueryId(sqlQueryId); + if (queryId != null) { // query id is null for JDBC SQL + queryMetrics.queryId(queryId); + } + } else { + queryMetrics = DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + warehouse.getToolChest(query), + query, + req.getRemoteAddr() + ); + } queryMetrics.success(success); queryMetrics.reportQueryTime(requestTimeNs).emit(emitter); } diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 526a9340087..8540a4ca4ca 100644 --- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.Druids; import org.apache.druid.query.MapQueryToolChestWarehouse; @@ -72,6 +73,11 @@ import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlQuery; import org.easymock.EasyMock; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.HandlerList; @@ -205,15 +211,24 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest @Test public void testSqlQueryProxy() throws Exception { - final SqlQuery query = new SqlQuery("SELECT * FROM foo", ResultFormat.ARRAY, false, false, false, null, null); + final SqlQuery query = new SqlQuery( + "SELECT * FROM foo", + ResultFormat.ARRAY, + false, + false, + false, + ImmutableMap.of("sqlQueryId", "dummy"), + null + ); final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class); - EasyMock.expect(hostFinder.findServerSql(query)) - .andReturn(new TestServer("http", "1.2.3.4", 9999)).once(); + EasyMock.expect(hostFinder.findServerSql( + query.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy"))) + ).andReturn(new TestServer("http", "1.2.3.4", 9999)).once(); EasyMock.replay(hostFinder); Properties properties = new Properties(); properties.setProperty("druid.router.sql.enable", "true"); - verifyServletCallsForQuery(query, true, hostFinder, properties); + verifyServletCallsForQuery(query, true, false, hostFinder, properties); } @Test @@ -230,7 +245,21 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once(); EasyMock.replay(hostFinder); - verifyServletCallsForQuery(query, false, hostFinder, new Properties()); + verifyServletCallsForQuery(query, false, false, hostFinder, new Properties()); + } + + @Test + public void testJDBCSqlProxy() throws Exception + { + final ImmutableMap jdbcRequest = ImmutableMap.of("connectionId", "dummy"); + + final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class); + EasyMock.expect(hostFinder.findServerAvatica("dummy")) + .andReturn(new TestServer("http", "1.2.3.4", 9999)) + .once(); + EasyMock.replay(hostFinder); + + verifyServletCallsForQuery(jdbcRequest, false, true, hostFinder, new Properties()); } @Test @@ -485,13 +514,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest */ private void verifyServletCallsForQuery( Object query, - boolean isSql, + boolean isNativeSql, + boolean isJDBCSql, QueryHostFinder hostFinder, Properties properties ) throws Exception { final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class); final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query)); final ServletInputStream servletInputStream = new ServletInputStream() { @@ -525,22 +554,58 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest return b; } }; + final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class); EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2); requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper); EasyMock.expectLastCall(); - EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? "/druid/v2/sql" : "/druid/v2/"); + EasyMock.expect(requestMock.getRequestURI()) + .andReturn(isNativeSql ? "/druid/v2/sql" : (isJDBCSql ? "/druid/v2/sql/avatica" : "/druid/v2/")); EasyMock.expect(requestMock.getMethod()).andReturn("POST"); - EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream); + if (isNativeSql) { + SqlQuery sqlQuery = (SqlQuery) query; + query = sqlQuery.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy")); + } requestMock.setAttribute( - isSql ? "org.apache.druid.proxy.sqlQuery" : "org.apache.druid.proxy.query", - query + "org.apache.druid.proxy." + (isNativeSql ? "sqlQuery" : (isJDBCSql ? "avaticaQuery" : "query")), + isJDBCSql ? jsonMapper.writeValueAsBytes(query) : query ); + EasyMock.expectLastCall(); + EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream); + + // metrics related mocking + EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery")) + .andReturn(isJDBCSql ? query : null); + EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.query")) + .andReturn(isJDBCSql ? null : (isNativeSql ? null : query)); + EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery")) + .andReturn(isJDBCSql ? null : (isNativeSql ? query : null)); + EasyMock.expect(requestMock.getRemoteAddr()).andReturn("0.0.0.0:0").times(isJDBCSql ? 1 : 2); requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999"); + EasyMock.expectLastCall(); requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http"); EasyMock.expectLastCall(); EasyMock.replay(requestMock); final AtomicLong didService = new AtomicLong(); + final Request proxyRequestMock = Mockito.spy(Request.class); + final Result result = new Result( + proxyRequestMock, + new HttpResponse(proxyRequestMock, ImmutableList.of()) + { + @Override + public HttpFields getHeaders() + { + HttpFields httpFields = new HttpFields(); + if (isJDBCSql) { + httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "jdbcDummy")); + } else if (isNativeSql) { + httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "dummy")); + } + return httpFields; + } + } + ); + final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), jsonMapper, @@ -548,7 +613,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest hostFinder, null, null, - new NoopServiceEmitter(), + stubServiceEmitter, new NoopRequestLogger(), new DefaultGenericQueryMetricsFactory(), new AuthenticatorMapper(ImmutableMap.of()), @@ -568,6 +633,19 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest servlet.service(requestMock, null); + // NPE is expected since the listener's onComplete calls the parent class' onComplete which fails due to + // partial state of the servlet. Hence, only catching the exact exception to avoid possible errors. + // Further, the metric assertions are also done to ensure that the metrics have emitted. + try { + servlet.newProxyResponseListener(requestMock, null).onComplete(result); + } + catch (NullPointerException ignored) { + } + Assert.assertEquals("query/time", stubServiceEmitter.getEvents().get(0).toMap().get("metric")); + if (!isJDBCSql) { + Assert.assertEquals("dummy", stubServiceEmitter.getEvents().get(0).toMap().get("id")); + } + // This test is mostly about verifying that the servlet calls the right methods the right number of times. EasyMock.verify(hostFinder, requestMock); Assert.assertEquals(1, didService.get()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java index b033bf4ba9d..1a78705ab7b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java @@ -180,6 +180,8 @@ public class NativeQueryMaker implements QueryMaker final String queryId = UUID.randomUUID().toString(); plannerContext.addNativeQueryId(queryId); query = query.withId(queryId); + } else { + plannerContext.addNativeQueryId(query.getId()); } query = query.withSqlQueryId(plannerContext.getSqlQueryId()); diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java index 460a8be697f..242df5c68b0 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java @@ -81,6 +81,19 @@ public class SqlQuery } } + public SqlQuery withOverridenContext(Map overridenContext) + { + return new SqlQuery( + getQuery(), + getResultFormat(), + includeHeader(), + includeTypesHeader(), + includeSqlTypesHeader(), + overridenContext, + getParameters() + ); + } + @JsonProperty public String getQuery() {