mirror of https://github.com/apache/druid.git
Add query/time metric for SQL queries from router (#12867)
* Add query/time metric for SQL queries from router * Fix query cancel bug when user has overriden native query-id in a SQL query
This commit is contained in:
parent
ee22663dd3
commit
7aa8d7f987
|
@ -39,6 +39,11 @@ Metrics may have additional dimensions beyond those listed above.
|
|||
|
||||
## Query metrics
|
||||
|
||||
### Router
|
||||
|Metric|Description|Dimensions|Normal Value|
|
||||
|------|-----------|----------|------------|
|
||||
|`query/time`|Milliseconds taken to complete a query.|Native Query: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id.|< 1s|
|
||||
|
||||
### Broker
|
||||
|
||||
|Metric|Description|Dimensions|Normal Value|
|
||||
|
|
|
@ -46,4 +46,9 @@ public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFac
|
|||
return queryMetrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryMetrics<Query<?>> makeMetrics()
|
||||
{
|
||||
return new DefaultQueryMetrics<>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,6 +132,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
|
|||
setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(query.getId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void queryId(String queryId)
|
||||
{
|
||||
setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(queryId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subQueryId(QueryType query)
|
||||
{
|
||||
|
@ -144,6 +150,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
|
|||
// Emit nothing by default.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sqlQueryId(String sqlQueryId)
|
||||
{
|
||||
// Emit nothing by default.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void context(QueryType query)
|
||||
{
|
||||
|
|
|
@ -46,4 +46,11 @@ public interface GenericQueryMetricsFactory
|
|||
* call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning.
|
||||
*/
|
||||
QueryMetrics<Query<?>> makeMetrics(Query<?> query);
|
||||
|
||||
/**
|
||||
* Creates a {@link QueryMetrics} which doesn't have predefined QueryMetrics subclass. This method is used
|
||||
* by the router to build a {@link QueryMetrics} for SQL queries. It is needed since at router, there is no native
|
||||
* query linked to a SQL query.
|
||||
*/
|
||||
QueryMetrics<Query<?>> makeMetrics();
|
||||
}
|
||||
|
|
|
@ -202,6 +202,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
|
|||
@PublicApi
|
||||
void queryId(QueryType query);
|
||||
|
||||
/**
|
||||
* Sets id of the given query as dimension.
|
||||
*/
|
||||
@PublicApi
|
||||
void queryId(@SuppressWarnings("UnusedParameters") String queryId);
|
||||
|
||||
/**
|
||||
* Sets {@link Query#getSubQueryId()} of the given query as dimension.
|
||||
*/
|
||||
|
@ -214,6 +220,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
|
|||
@PublicApi
|
||||
void sqlQueryId(QueryType query);
|
||||
|
||||
/**
|
||||
* Sets sqlQueryId as a dimension
|
||||
*/
|
||||
@PublicApi
|
||||
void sqlQueryId(@SuppressWarnings("UnusedParameters") String sqlQueryId);
|
||||
|
||||
/**
|
||||
* Sets {@link Query#getContext()} of the given query as dimension.
|
||||
*/
|
||||
|
|
|
@ -87,6 +87,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
|
|||
throw new ISE("Unsupported method in default query metrics implementation.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void queryId(String queryId)
|
||||
{
|
||||
throw new ISE("Unsupported method in default query metrics implementation.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subQueryId(SearchQuery query)
|
||||
{
|
||||
|
@ -99,6 +105,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
|
|||
throw new ISE("Unsupported method in default query metrics implementation.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sqlQueryId(String sqlQueryId)
|
||||
{
|
||||
throw new ISE("Unsupported method in default query metrics implementation.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void granularity(SearchQuery query)
|
||||
{
|
||||
|
|
|
@ -67,6 +67,10 @@ public class DefaultQueryMetricsTest
|
|||
.build();
|
||||
queryMetrics.query(query);
|
||||
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
|
||||
// No way to verify this right now since DefaultQueryMetrics implements a no-op for sqlQueryId(String) and queryId(String)
|
||||
// This change is done to keep the code coverage tool happy by exercising the implementation
|
||||
queryMetrics.sqlQueryId("dummy");
|
||||
queryMetrics.queryId("dummy");
|
||||
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
|
||||
Assert.assertEquals(13, actualEvent.size());
|
||||
Assert.assertTrue(actualEvent.containsKey("feed"));
|
||||
|
@ -81,7 +85,7 @@ public class DefaultQueryMetricsTest
|
|||
Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL));
|
||||
Assert.assertEquals("true", actualEvent.get("hasFilters"));
|
||||
Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration"));
|
||||
Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
|
||||
Assert.assertEquals("dummy", actualEvent.get(DruidMetrics.ID));
|
||||
Assert.assertEquals("query/time", actualEvent.get("metric"));
|
||||
Assert.assertEquals(0L, actualEvent.get("value"));
|
||||
Assert.assertEquals(ImmutableMap.of("testKey", "testValue"), actualEvent.get("context"));
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.query.search;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.query.CachingEmitter;
|
||||
import org.apache.druid.query.DefaultQueryMetricsTest;
|
||||
|
@ -86,6 +87,8 @@ public class DefaultSearchQueryMetricsTest
|
|||
// Metric
|
||||
Assert.assertEquals("query/time", actualEvent.get("metric"));
|
||||
Assert.assertEquals(0L, actualEvent.get("value"));
|
||||
|
||||
Assert.assertThrows(ISE.class, () -> queryMetrics.sqlQueryId("dummy"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -98,6 +98,7 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
*/
|
||||
public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context";
|
||||
public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
|
||||
public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id";
|
||||
public static final String HEADER_ETAG = "ETag";
|
||||
|
||||
protected final QueryLifecycleFactory queryLifecycleFactory;
|
||||
|
@ -252,7 +253,7 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
},
|
||||
ioReaderWriter.getResponseWriter().getResponseType()
|
||||
)
|
||||
.header("X-Druid-Query-Id", queryId);
|
||||
.header(QUERY_ID_RESPONSE_HEADER, queryId);
|
||||
|
||||
transferEntityTag(responseContext, responseBuilder);
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.IAE;
|
|||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.query.BaseQuery;
|
||||
import org.apache.druid.query.DruidMetrics;
|
||||
import org.apache.druid.query.GenericQueryMetricsFactory;
|
||||
import org.apache.druid.query.Query;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.druid.server.security.AuthenticationResult;
|
|||
import org.apache.druid.server.security.Authenticator;
|
||||
import org.apache.druid.server.security.AuthenticatorMapper;
|
||||
import org.apache.druid.sql.http.SqlQuery;
|
||||
import org.apache.druid.sql.http.SqlResource;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
|
@ -65,12 +67,14 @@ import org.eclipse.jetty.http.HttpHeader;
|
|||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.proxy.AsyncProxyServlet;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
@ -266,11 +270,16 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
handleException(response, objectMapper, e);
|
||||
return;
|
||||
}
|
||||
} else if (routeSqlByStrategy && isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
|
||||
} else if (isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
|
||||
try {
|
||||
SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class);
|
||||
inputSqlQuery = buildSqlQueryWithId(inputSqlQuery);
|
||||
request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
|
||||
if (routeSqlByStrategy) {
|
||||
targetServer = hostFinder.findServerSql(inputSqlQuery);
|
||||
} else {
|
||||
targetServer = hostFinder.pickDefaultServer();
|
||||
}
|
||||
LOG.debug("Forwarding SQL query to broker [%s]", targetServer.getHost());
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -292,6 +301,22 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
doService(request, response);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuilds the {@link SqlQuery} object with sqlQueryId and queryId context parameters if not present
|
||||
* @param sqlQuery the original SqlQuery
|
||||
* @return an updated sqlQuery object with sqlQueryId and queryId context parameters
|
||||
*/
|
||||
private SqlQuery buildSqlQueryWithId(SqlQuery sqlQuery)
|
||||
{
|
||||
Map<String, Object> context = new HashMap<>(sqlQuery.getContext());
|
||||
String sqlQueryId = (String) context.getOrDefault(BaseQuery.SQL_QUERY_ID, UUID.randomUUID().toString());
|
||||
// set queryId to sqlQueryId if not overridden
|
||||
String queryId = (String) context.getOrDefault(BaseQuery.QUERY_ID, sqlQueryId);
|
||||
context.put(BaseQuery.SQL_QUERY_ID, sqlQueryId);
|
||||
context.put(BaseQuery.QUERY_ID, queryId);
|
||||
return sqlQuery.withOverridenContext(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Issues async query cancellation requests to all Brokers (except the given
|
||||
* targetServer). Query cancellation on the targetServer is handled by the
|
||||
|
@ -449,12 +474,15 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
@Override
|
||||
protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
|
||||
{
|
||||
final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE);
|
||||
if (query != null) {
|
||||
return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime());
|
||||
} else {
|
||||
return super.newProxyResponseListener(request, response);
|
||||
}
|
||||
boolean isJDBC = request.getAttribute(AVATICA_QUERY_ATTRIBUTE) != null;
|
||||
return newMetricsEmittingProxyResponseListener(
|
||||
request,
|
||||
response,
|
||||
(Query) request.getAttribute(QUERY_ATTRIBUTE),
|
||||
(SqlQuery) request.getAttribute(SQL_QUERY_ATTRIBUTE),
|
||||
isJDBC,
|
||||
System.nanoTime()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -500,11 +528,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
private Response.Listener newMetricsEmittingProxyResponseListener(
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response,
|
||||
Query query,
|
||||
@Nullable Query query,
|
||||
@Nullable SqlQuery sqlQuery,
|
||||
boolean isJDBC,
|
||||
long startNs
|
||||
)
|
||||
{
|
||||
return new MetricsEmittingProxyResponseListener(request, response, query, startNs);
|
||||
return new MetricsEmittingProxyResponseListener(request, response, query, sqlQuery, isJDBC, startNs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -660,22 +690,28 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener
|
||||
{
|
||||
private final HttpServletRequest req;
|
||||
private final HttpServletResponse res;
|
||||
@Nullable
|
||||
private final Query<T> query;
|
||||
@Nullable
|
||||
private final SqlQuery sqlQuery;
|
||||
private final boolean isJDBC;
|
||||
private final long startNs;
|
||||
|
||||
public MetricsEmittingProxyResponseListener(
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response,
|
||||
Query<T> query,
|
||||
@Nullable Query<T> query,
|
||||
@Nullable SqlQuery sqlQuery,
|
||||
boolean isJDBC,
|
||||
long startNs
|
||||
)
|
||||
{
|
||||
super(request, response);
|
||||
|
||||
this.req = request;
|
||||
this.res = response;
|
||||
this.query = query;
|
||||
this.sqlQuery = sqlQuery;
|
||||
this.isJDBC = isJDBC;
|
||||
this.startNs = startNs;
|
||||
}
|
||||
|
||||
|
@ -683,14 +719,63 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
public void onComplete(Result result)
|
||||
{
|
||||
final long requestTimeNs = System.nanoTime() - startNs;
|
||||
try {
|
||||
String queryId = null;
|
||||
String sqlQueryId = null;
|
||||
if (isJDBC) {
|
||||
sqlQueryId = result.getResponse().getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER);
|
||||
} else if (sqlQuery != null) {
|
||||
sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null);
|
||||
queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null);
|
||||
} else if (query != null) {
|
||||
queryId = query.getId();
|
||||
}
|
||||
|
||||
// not a native or SQL query, no need to emit metrics and logs
|
||||
if (queryId == null && sqlQueryId == null) {
|
||||
super.onComplete(result);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean success = result.isSucceeded();
|
||||
if (success) {
|
||||
successfulQueryCount.incrementAndGet();
|
||||
} else {
|
||||
failedQueryCount.incrementAndGet();
|
||||
}
|
||||
emitQueryTime(requestTimeNs, success);
|
||||
emitQueryTime(requestTimeNs, success, sqlQueryId, queryId);
|
||||
|
||||
//noinspection VariableNotUsedInsideIf
|
||||
if (sqlQueryId != null) {
|
||||
// SQL query doesn't have a native query translation in router. Hence, not logging the native query.
|
||||
if (sqlQuery != null) {
|
||||
try {
|
||||
requestLogger.logSqlQuery(
|
||||
RequestLogLine.forSql(
|
||||
sqlQuery.getQuery(),
|
||||
sqlQuery.getContext(),
|
||||
DateTimes.nowUtc(),
|
||||
req.getRemoteAddr(),
|
||||
new QueryStats(
|
||||
ImmutableMap.of(
|
||||
"query/time",
|
||||
TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
|
||||
"success",
|
||||
success
|
||||
&& result.getResponse().getStatus() == Status.OK.getStatusCode()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery);
|
||||
}
|
||||
}
|
||||
super.onComplete(result);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
requestLogger.logNativeQuery(
|
||||
RequestLogLine.forNative(
|
||||
query,
|
||||
|
@ -718,10 +803,64 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
try {
|
||||
final long requestTimeNs = System.nanoTime() - startNs;
|
||||
final String errorMessage = failure.getMessage();
|
||||
String queryId = null;
|
||||
String sqlQueryId = null;
|
||||
if (isJDBC) {
|
||||
sqlQueryId = response.getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER);
|
||||
} else if (sqlQuery != null) {
|
||||
sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null);
|
||||
queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null);
|
||||
} else if (query != null) {
|
||||
queryId = query.getId();
|
||||
}
|
||||
|
||||
// not a native or SQL query, no need to emit metrics and logs
|
||||
if (queryId == null && sqlQueryId == null) {
|
||||
super.onFailure(response, failure);
|
||||
return;
|
||||
}
|
||||
|
||||
failedQueryCount.incrementAndGet();
|
||||
emitQueryTime(System.nanoTime() - startNs, false);
|
||||
emitQueryTime(requestTimeNs, false, sqlQueryId, queryId);
|
||||
|
||||
//noinspection VariableNotUsedInsideIf
|
||||
if (sqlQueryId != null) {
|
||||
// SQL query doesn't have a native query translation in router. Hence, not logging the native query.
|
||||
if (sqlQuery != null) {
|
||||
try {
|
||||
requestLogger.logSqlQuery(
|
||||
RequestLogLine.forSql(
|
||||
sqlQuery.getQuery(),
|
||||
sqlQuery.getContext(),
|
||||
DateTimes.nowUtc(),
|
||||
req.getRemoteAddr(),
|
||||
new QueryStats(
|
||||
ImmutableMap.of(
|
||||
"success",
|
||||
false,
|
||||
"exception",
|
||||
errorMessage == null ? "no message" : errorMessage
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery);
|
||||
}
|
||||
LOG.makeAlert(failure, "Exception handling request")
|
||||
.addData("exception", failure.toString())
|
||||
.addData("sqlQuery", sqlQuery)
|
||||
.addData("peer", req.getRemoteAddr())
|
||||
.emit();
|
||||
}
|
||||
super.onFailure(response, failure);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
requestLogger.logNativeQuery(
|
||||
RequestLogLine.forNative(
|
||||
query,
|
||||
|
@ -751,14 +890,25 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
super.onFailure(response, failure);
|
||||
}
|
||||
|
||||
private void emitQueryTime(long requestTimeNs, boolean success)
|
||||
private void emitQueryTime(long requestTimeNs, boolean success, @Nullable String sqlQueryId, @Nullable String queryId)
|
||||
{
|
||||
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
|
||||
QueryMetrics queryMetrics;
|
||||
if (sqlQueryId != null) {
|
||||
queryMetrics = queryMetricsFactory.makeMetrics();
|
||||
queryMetrics.remoteAddress(req.getRemoteAddr());
|
||||
// Setting sqlQueryId and queryId dimensions to the metric
|
||||
queryMetrics.sqlQueryId(sqlQueryId);
|
||||
if (queryId != null) { // query id is null for JDBC SQL
|
||||
queryMetrics.queryId(queryId);
|
||||
}
|
||||
} else {
|
||||
queryMetrics = DruidMetrics.makeRequestMetrics(
|
||||
queryMetricsFactory,
|
||||
warehouse.getToolChest(query),
|
||||
query,
|
||||
req.getRemoteAddr()
|
||||
);
|
||||
}
|
||||
queryMetrics.success(success);
|
||||
queryMetrics.reportQueryTime(requestTimeNs).emit(emitter);
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.StringUtils;
|
|||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
|
||||
import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.MapQueryToolChestWarehouse;
|
||||
|
@ -72,6 +73,11 @@ import org.apache.druid.sql.http.ResultFormat;
|
|||
import org.apache.druid.sql.http.SqlQuery;
|
||||
import org.easymock.EasyMock;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
|
@ -205,15 +211,24 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
|||
@Test
|
||||
public void testSqlQueryProxy() throws Exception
|
||||
{
|
||||
final SqlQuery query = new SqlQuery("SELECT * FROM foo", ResultFormat.ARRAY, false, false, false, null, null);
|
||||
final SqlQuery query = new SqlQuery(
|
||||
"SELECT * FROM foo",
|
||||
ResultFormat.ARRAY,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
ImmutableMap.of("sqlQueryId", "dummy"),
|
||||
null
|
||||
);
|
||||
final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
|
||||
EasyMock.expect(hostFinder.findServerSql(query))
|
||||
.andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
|
||||
EasyMock.expect(hostFinder.findServerSql(
|
||||
query.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy")))
|
||||
).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
|
||||
EasyMock.replay(hostFinder);
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("druid.router.sql.enable", "true");
|
||||
verifyServletCallsForQuery(query, true, hostFinder, properties);
|
||||
verifyServletCallsForQuery(query, true, false, hostFinder, properties);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -230,7 +245,21 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
|||
EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
|
||||
EasyMock.replay(hostFinder);
|
||||
|
||||
verifyServletCallsForQuery(query, false, hostFinder, new Properties());
|
||||
verifyServletCallsForQuery(query, false, false, hostFinder, new Properties());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJDBCSqlProxy() throws Exception
|
||||
{
|
||||
final ImmutableMap<String, Object> jdbcRequest = ImmutableMap.of("connectionId", "dummy");
|
||||
|
||||
final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
|
||||
EasyMock.expect(hostFinder.findServerAvatica("dummy"))
|
||||
.andReturn(new TestServer("http", "1.2.3.4", 9999))
|
||||
.once();
|
||||
EasyMock.replay(hostFinder);
|
||||
|
||||
verifyServletCallsForQuery(jdbcRequest, false, true, hostFinder, new Properties());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -485,13 +514,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
|||
*/
|
||||
private void verifyServletCallsForQuery(
|
||||
Object query,
|
||||
boolean isSql,
|
||||
boolean isNativeSql,
|
||||
boolean isJDBCSql,
|
||||
QueryHostFinder hostFinder,
|
||||
Properties properties
|
||||
) throws Exception
|
||||
{
|
||||
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
|
||||
final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
|
||||
final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
|
||||
final ServletInputStream servletInputStream = new ServletInputStream()
|
||||
{
|
||||
|
@ -525,22 +554,58 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
|||
return b;
|
||||
}
|
||||
};
|
||||
final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
|
||||
EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2);
|
||||
requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? "/druid/v2/sql" : "/druid/v2/");
|
||||
EasyMock.expect(requestMock.getRequestURI())
|
||||
.andReturn(isNativeSql ? "/druid/v2/sql" : (isJDBCSql ? "/druid/v2/sql/avatica" : "/druid/v2/"));
|
||||
EasyMock.expect(requestMock.getMethod()).andReturn("POST");
|
||||
EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
|
||||
if (isNativeSql) {
|
||||
SqlQuery sqlQuery = (SqlQuery) query;
|
||||
query = sqlQuery.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy"));
|
||||
}
|
||||
requestMock.setAttribute(
|
||||
isSql ? "org.apache.druid.proxy.sqlQuery" : "org.apache.druid.proxy.query",
|
||||
query
|
||||
"org.apache.druid.proxy." + (isNativeSql ? "sqlQuery" : (isJDBCSql ? "avaticaQuery" : "query")),
|
||||
isJDBCSql ? jsonMapper.writeValueAsBytes(query) : query
|
||||
);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
|
||||
|
||||
// metrics related mocking
|
||||
EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery"))
|
||||
.andReturn(isJDBCSql ? query : null);
|
||||
EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.query"))
|
||||
.andReturn(isJDBCSql ? null : (isNativeSql ? null : query));
|
||||
EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery"))
|
||||
.andReturn(isJDBCSql ? null : (isNativeSql ? query : null));
|
||||
EasyMock.expect(requestMock.getRemoteAddr()).andReturn("0.0.0.0:0").times(isJDBCSql ? 1 : 2);
|
||||
requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999");
|
||||
EasyMock.expectLastCall();
|
||||
requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(requestMock);
|
||||
|
||||
final AtomicLong didService = new AtomicLong();
|
||||
final Request proxyRequestMock = Mockito.spy(Request.class);
|
||||
final Result result = new Result(
|
||||
proxyRequestMock,
|
||||
new HttpResponse(proxyRequestMock, ImmutableList.of())
|
||||
{
|
||||
@Override
|
||||
public HttpFields getHeaders()
|
||||
{
|
||||
HttpFields httpFields = new HttpFields();
|
||||
if (isJDBCSql) {
|
||||
httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "jdbcDummy"));
|
||||
} else if (isNativeSql) {
|
||||
httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "dummy"));
|
||||
}
|
||||
return httpFields;
|
||||
}
|
||||
}
|
||||
);
|
||||
final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
|
||||
final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
|
||||
new MapQueryToolChestWarehouse(ImmutableMap.of()),
|
||||
jsonMapper,
|
||||
|
@ -548,7 +613,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
|||
hostFinder,
|
||||
null,
|
||||
null,
|
||||
new NoopServiceEmitter(),
|
||||
stubServiceEmitter,
|
||||
new NoopRequestLogger(),
|
||||
new DefaultGenericQueryMetricsFactory(),
|
||||
new AuthenticatorMapper(ImmutableMap.of()),
|
||||
|
@ -568,6 +633,19 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
|||
|
||||
servlet.service(requestMock, null);
|
||||
|
||||
// NPE is expected since the listener's onComplete calls the parent class' onComplete which fails due to
|
||||
// partial state of the servlet. Hence, only catching the exact exception to avoid possible errors.
|
||||
// Further, the metric assertions are also done to ensure that the metrics have emitted.
|
||||
try {
|
||||
servlet.newProxyResponseListener(requestMock, null).onComplete(result);
|
||||
}
|
||||
catch (NullPointerException ignored) {
|
||||
}
|
||||
Assert.assertEquals("query/time", stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
|
||||
if (!isJDBCSql) {
|
||||
Assert.assertEquals("dummy", stubServiceEmitter.getEvents().get(0).toMap().get("id"));
|
||||
}
|
||||
|
||||
// This test is mostly about verifying that the servlet calls the right methods the right number of times.
|
||||
EasyMock.verify(hostFinder, requestMock);
|
||||
Assert.assertEquals(1, didService.get());
|
||||
|
|
|
@ -180,6 +180,8 @@ public class NativeQueryMaker implements QueryMaker
|
|||
final String queryId = UUID.randomUUID().toString();
|
||||
plannerContext.addNativeQueryId(queryId);
|
||||
query = query.withId(queryId);
|
||||
} else {
|
||||
plannerContext.addNativeQueryId(query.getId());
|
||||
}
|
||||
|
||||
query = query.withSqlQueryId(plannerContext.getSqlQueryId());
|
||||
|
|
|
@ -81,6 +81,19 @@ public class SqlQuery
|
|||
}
|
||||
}
|
||||
|
||||
public SqlQuery withOverridenContext(Map<String, Object> overridenContext)
|
||||
{
|
||||
return new SqlQuery(
|
||||
getQuery(),
|
||||
getResultFormat(),
|
||||
includeHeader(),
|
||||
includeTypesHeader(),
|
||||
includeSqlTypesHeader(),
|
||||
overridenContext,
|
||||
getParameters()
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getQuery()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue