mirror of https://github.com/apache/druid.git
Integration test for sys tables (#6792)
* Add integration test for sys tables * Add test for batch index sys table queries * Address PR comments * remove unused import * Address PR comments * fix teamcity
This commit is contained in:
parent
e497141e92
commit
4300491b81
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.clients;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.http.client.HttpClient;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.TestClient;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class AbstractQueryResourceTestClient<QueryType>
|
||||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final HttpClient httpClient;
|
||||
protected final String routerUrl;
|
||||
private final StatusResponseHandler responseHandler;
|
||||
|
||||
@Inject
|
||||
AbstractQueryResourceTestClient(
|
||||
ObjectMapper jsonMapper,
|
||||
@TestClient HttpClient httpClient,
|
||||
IntegrationTestingConfig config
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.httpClient = httpClient;
|
||||
this.routerUrl = config.getRouterUrl();
|
||||
this.responseHandler = new StatusResponseHandler(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
public abstract String getBrokerURL();
|
||||
|
||||
public List<Map<String, Object>> query(String url, QueryType query)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.POST, new URL(url)).setContent(
|
||||
"application/json",
|
||||
jsonMapper.writeValueAsBytes(query)
|
||||
), responseHandler
|
||||
|
||||
).get();
|
||||
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
throw new ISE(
|
||||
"Error while querying[%s] status[%s] content[%s]",
|
||||
getBrokerURL(),
|
||||
response.getStatus(),
|
||||
response.getContent()
|
||||
);
|
||||
}
|
||||
|
||||
return jsonMapper.readValue(
|
||||
response.getContent(), new TypeReference<List<Map<String, Object>>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -20,33 +20,16 @@
|
|||
package org.apache.druid.testing.clients;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.http.client.HttpClient;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.TestClient;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class QueryResourceTestClient
|
||||
public class QueryResourceTestClient extends AbstractQueryResourceTestClient<Query>
|
||||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final String routerUrl;
|
||||
private final StatusResponseHandler responseHandler;
|
||||
|
||||
@Inject
|
||||
QueryResourceTestClient(
|
||||
|
@ -55,13 +38,11 @@ public class QueryResourceTestClient
|
|||
IntegrationTestingConfig config
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.httpClient = httpClient;
|
||||
this.routerUrl = config.getRouterUrl();
|
||||
this.responseHandler = new StatusResponseHandler(StandardCharsets.UTF_8);
|
||||
super(jsonMapper, httpClient, config);
|
||||
}
|
||||
|
||||
private String getBrokerURL()
|
||||
@Override
|
||||
public String getBrokerURL()
|
||||
{
|
||||
return StringUtils.format(
|
||||
"%s/druid/v2/",
|
||||
|
@ -69,34 +50,4 @@ public class QueryResourceTestClient
|
|||
);
|
||||
}
|
||||
|
||||
public List<Map<String, Object>> query(String url, Query query)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.POST, new URL(url)).setContent(
|
||||
"application/json",
|
||||
jsonMapper.writeValueAsBytes(query)
|
||||
), responseHandler
|
||||
|
||||
).get();
|
||||
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
throw new ISE(
|
||||
"Error while querying[%s] status[%s] content[%s]",
|
||||
getBrokerURL(),
|
||||
response.getStatus(),
|
||||
response.getContent()
|
||||
);
|
||||
}
|
||||
|
||||
return jsonMapper.readValue(
|
||||
response.getContent(), new TypeReference<List<Map<String, Object>>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.clients;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.http.client.HttpClient;
|
||||
import org.apache.druid.sql.http.SqlQuery;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.TestClient;
|
||||
|
||||
public class SqlResourceTestClient extends AbstractQueryResourceTestClient<SqlQuery>
|
||||
{
|
||||
|
||||
@Inject
|
||||
SqlResourceTestClient(
|
||||
ObjectMapper jsonMapper,
|
||||
@TestClient HttpClient httpClient,
|
||||
IntegrationTestingConfig config
|
||||
)
|
||||
{
|
||||
super(jsonMapper, httpClient, config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBrokerURL()
|
||||
{
|
||||
return StringUtils.format(
|
||||
"%s/druid/v2/sql/",
|
||||
routerUrl
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AbstractQueryWithResults<QueryType>
|
||||
{
|
||||
private final QueryType query;
|
||||
private final List<Map<String, Object>> expectedResults;
|
||||
|
||||
@JsonCreator
|
||||
public AbstractQueryWithResults(
|
||||
@JsonProperty("query") QueryType query,
|
||||
@JsonProperty("expectedResults") List<Map<String, Object>> expectedResults
|
||||
)
|
||||
{
|
||||
this.query = query;
|
||||
this.expectedResults = expectedResults;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public QueryType getQuery()
|
||||
{
|
||||
return query;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<Map<String, Object>> getExpectedResults()
|
||||
{
|
||||
return expectedResults;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "QueryWithResults{" +
|
||||
"query=" + query +
|
||||
", expectedResults=" + expectedResults +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.AbstractQueryResourceTestClient;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQueryWithResults>
|
||||
{
|
||||
|
||||
public static Logger LOG = new Logger(TestQueryHelper.class);
|
||||
private final AbstractQueryResourceTestClient queryClient;
|
||||
private final ObjectMapper jsonMapper;
|
||||
protected final String broker;
|
||||
protected final String brokerTLS;
|
||||
protected final String router;
|
||||
protected final String routerTLS;
|
||||
|
||||
|
||||
@Inject
|
||||
AbstractTestQueryHelper(
|
||||
ObjectMapper jsonMapper,
|
||||
AbstractQueryResourceTestClient queryClient,
|
||||
IntegrationTestingConfig config
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.queryClient = queryClient;
|
||||
this.broker = config.getBrokerUrl();
|
||||
this.brokerTLS = config.getBrokerTLSUrl();
|
||||
this.router = config.getRouterUrl();
|
||||
this.routerTLS = config.getRouterTLSUrl();
|
||||
}
|
||||
|
||||
public abstract void testQueriesFromFile(String filePath, int timesToRun) throws Exception;
|
||||
|
||||
protected abstract String getQueryURL(String schemeAndHost);
|
||||
|
||||
public void testQueriesFromFile(String url, String filePath, int timesToRun) throws Exception
|
||||
{
|
||||
LOG.info("Starting query tests for [%s]", filePath);
|
||||
List<QueryResultType> queries =
|
||||
jsonMapper.readValue(
|
||||
TestQueryHelper.class.getResourceAsStream(filePath),
|
||||
new TypeReference<List<QueryResultType>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
|
||||
testQueries(url, queries, timesToRun);
|
||||
}
|
||||
|
||||
public void testQueriesFromString(String url, String str, int timesToRun) throws Exception
|
||||
{
|
||||
LOG.info("Starting query tests using\n%s", str);
|
||||
List<QueryResultType> queries =
|
||||
jsonMapper.readValue(
|
||||
str,
|
||||
new TypeReference<List<QueryResultType>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
testQueries(url, queries, timesToRun);
|
||||
}
|
||||
|
||||
private void testQueries(String url, List<QueryResultType> queries, int timesToRun) throws Exception
|
||||
{
|
||||
LOG.info("Running queries, url [%s]", url);
|
||||
for (int i = 0; i < timesToRun; i++) {
|
||||
LOG.info("Starting Iteration %d", i);
|
||||
|
||||
boolean failed = false;
|
||||
for (QueryResultType queryWithResult : queries) {
|
||||
LOG.info("Running Query %s", queryWithResult.getQuery());
|
||||
List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
|
||||
if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults())) {
|
||||
LOG.error(
|
||||
"Failed while executing query %s \n expectedResults: %s \n actualResults : %s",
|
||||
queryWithResult.getQuery(),
|
||||
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
|
||||
jsonMapper.writeValueAsString(result)
|
||||
);
|
||||
failed = true;
|
||||
} else {
|
||||
LOG.info("Results Verified for Query %s", queryWithResult.getQuery());
|
||||
}
|
||||
}
|
||||
|
||||
if (failed) {
|
||||
throw new ISE("one or more queries failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public int countRows(String dataSource, String interval)
|
||||
{
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(dataSource)
|
||||
.aggregators(
|
||||
ImmutableList.of(
|
||||
new LongSumAggregatorFactory("rows", "count")
|
||||
)
|
||||
)
|
||||
.granularity(Granularities.ALL)
|
||||
.intervals(interval)
|
||||
.build();
|
||||
|
||||
List<Map<String, Object>> results = queryClient.query(getQueryURL(broker), query);
|
||||
if (results.isEmpty()) {
|
||||
return 0;
|
||||
} else {
|
||||
Map<String, Object> map = (Map<String, Object>) results.get(0).get("result");
|
||||
|
||||
return (Integer) map.get("rows");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -20,45 +20,21 @@
|
|||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.query.Query;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class QueryWithResults
|
||||
public class QueryWithResults extends AbstractQueryWithResults<Query>
|
||||
{
|
||||
private final Query query;
|
||||
private final List<Map<String, Object>> expectedResults;
|
||||
|
||||
@JsonCreator
|
||||
public QueryWithResults(
|
||||
@JsonProperty("query") Query query,
|
||||
@JsonProperty("expectedResults") List<Map<String, Object>> expectedResults
|
||||
Query query,
|
||||
List<Map<String, Object>> expectedResults
|
||||
)
|
||||
{
|
||||
this.query = query;
|
||||
this.expectedResults = expectedResults;
|
||||
super(query, expectedResults);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Query getQuery()
|
||||
{
|
||||
return query;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<Map<String, Object>> getExpectedResults()
|
||||
{
|
||||
return expectedResults;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "QueryWithResults{" +
|
||||
"query=" + query +
|
||||
", expectedResults=" + expectedResults +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import org.apache.druid.sql.http.SqlQuery;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SqlQueryWithResults extends AbstractQueryWithResults<SqlQuery>
|
||||
{
|
||||
|
||||
@JsonCreator
|
||||
public SqlQueryWithResults(
|
||||
SqlQuery query,
|
||||
List<Map<String, Object>> expectedResults
|
||||
)
|
||||
{
|
||||
super(query, expectedResults);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.SqlResourceTestClient;
|
||||
|
||||
public class SqlTestQueryHelper extends AbstractTestQueryHelper<SqlQueryWithResults>
|
||||
{
|
||||
|
||||
@Inject
|
||||
public SqlTestQueryHelper(
|
||||
ObjectMapper jsonMapper,
|
||||
SqlResourceTestClient sqlClient,
|
||||
IntegrationTestingConfig config
|
||||
)
|
||||
{
|
||||
super(jsonMapper, sqlClient, config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
|
||||
{
|
||||
testQueriesFromFile(getQueryURL(broker), filePath, timesToRun);
|
||||
testQueriesFromFile(getQueryURL(brokerTLS), filePath, timesToRun);
|
||||
testQueriesFromFile(getQueryURL(router), filePath, timesToRun);
|
||||
testQueriesFromFile(getQueryURL(routerTLS), filePath, timesToRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getQueryURL(String schemeAndHost)
|
||||
{
|
||||
return StringUtils.format("%s/druid/v2/sql", schemeAndHost);
|
||||
}
|
||||
}
|
|
@ -19,33 +19,14 @@
|
|||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.QueryResourceTestClient;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestQueryHelper
|
||||
public class TestQueryHelper extends AbstractTestQueryHelper<QueryWithResults>
|
||||
{
|
||||
public static Logger LOG = new Logger(TestQueryHelper.class);
|
||||
private final QueryResourceTestClient queryClient;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final String broker;
|
||||
private final String brokerTLS;
|
||||
private final String router;
|
||||
private final String routerTLS;
|
||||
private final IntegrationTestingConfig config;
|
||||
|
||||
@Inject
|
||||
TestQueryHelper(
|
||||
|
@ -54,15 +35,10 @@ public class TestQueryHelper
|
|||
IntegrationTestingConfig config
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.queryClient = queryClient;
|
||||
this.broker = config.getBrokerUrl();
|
||||
this.brokerTLS = config.getBrokerTLSUrl();
|
||||
this.router = config.getRouterUrl();
|
||||
this.routerTLS = config.getRouterTLSUrl();
|
||||
this.config = config;
|
||||
super(jsonMapper, queryClient, config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
|
||||
{
|
||||
testQueriesFromFile(getQueryURL(broker), filePath, timesToRun);
|
||||
|
@ -71,19 +47,6 @@ public class TestQueryHelper
|
|||
testQueriesFromFile(getQueryURL(routerTLS), filePath, timesToRun);
|
||||
}
|
||||
|
||||
public void testQueriesFromFile(String url, String filePath, int timesToRun) throws Exception
|
||||
{
|
||||
LOG.info("Starting query tests for [%s]", filePath);
|
||||
List<QueryWithResults> queries =
|
||||
jsonMapper.readValue(
|
||||
TestQueryHelper.class.getResourceAsStream(filePath),
|
||||
new TypeReference<List<QueryWithResults>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
testQueries(url, queries, timesToRun);
|
||||
}
|
||||
|
||||
public void testQueriesFromString(String str, int timesToRun) throws Exception
|
||||
{
|
||||
testQueriesFromString(getQueryURL(broker), str, timesToRun);
|
||||
|
@ -92,74 +55,11 @@ public class TestQueryHelper
|
|||
testQueriesFromString(getQueryURL(routerTLS), str, timesToRun);
|
||||
}
|
||||
|
||||
public void testQueriesFromString(String url, String str, int timesToRun) throws Exception
|
||||
{
|
||||
LOG.info("Starting query tests using\n%s", str);
|
||||
List<QueryWithResults> queries =
|
||||
jsonMapper.readValue(
|
||||
str,
|
||||
new TypeReference<List<QueryWithResults>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
testQueries(url, queries, timesToRun);
|
||||
}
|
||||
|
||||
private void testQueries(String url, List<QueryWithResults> queries, int timesToRun) throws Exception
|
||||
{
|
||||
LOG.info("Running queries, url [%s]", url);
|
||||
for (int i = 0; i < timesToRun; i++) {
|
||||
LOG.info("Starting Iteration %d", i);
|
||||
|
||||
boolean failed = false;
|
||||
for (QueryWithResults queryWithResult : queries) {
|
||||
LOG.info("Running Query %s", queryWithResult.getQuery().getType());
|
||||
List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
|
||||
if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults())) {
|
||||
LOG.error(
|
||||
"Failed while executing query %s \n expectedResults: %s \n actualResults : %s",
|
||||
queryWithResult.getQuery(),
|
||||
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
|
||||
jsonMapper.writeValueAsString(result)
|
||||
);
|
||||
failed = true;
|
||||
} else {
|
||||
LOG.info("Results Verified for Query %s", queryWithResult.getQuery().getType());
|
||||
}
|
||||
}
|
||||
|
||||
if (failed) {
|
||||
throw new ISE("one or more queries failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getQueryURL(String schemeAndHost)
|
||||
@Override
|
||||
protected String getQueryURL(String schemeAndHost)
|
||||
{
|
||||
return StringUtils.format("%s/druid/v2?pretty", schemeAndHost);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public int countRows(String dataSource, String interval)
|
||||
{
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(dataSource)
|
||||
.aggregators(
|
||||
ImmutableList.of(
|
||||
new LongSumAggregatorFactory("rows", "count")
|
||||
)
|
||||
)
|
||||
.granularity(Granularities.ALL)
|
||||
.intervals(interval)
|
||||
.build();
|
||||
|
||||
List<Map<String, Object>> results = queryClient.query(getQueryURL(broker), query);
|
||||
if (results.isEmpty()) {
|
||||
return 0;
|
||||
} else {
|
||||
Map<String, Object> map = (Map<String, Object>) results.get(0).get("result");
|
||||
|
||||
return (Integer) map.get("rows");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.logger.Logger;
|
|||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
|
||||
import org.apache.druid.testing.utils.RetryUtil;
|
||||
import org.apache.druid.testing.utils.SqlTestQueryHelper;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -39,6 +40,8 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
|
||||
@Inject
|
||||
IntegrationTestingConfig config;
|
||||
@Inject
|
||||
protected SqlTestQueryHelper sqlQueryHelper;
|
||||
|
||||
@Inject
|
||||
ClientInfoResourceTestClient clientInfoResourceTestClient;
|
||||
|
@ -135,6 +138,22 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
}
|
||||
}
|
||||
|
||||
void doIndexTestSqlTest(
|
||||
String dataSource,
|
||||
String indexTaskFilePath,
|
||||
String queryFilePath
|
||||
)
|
||||
{
|
||||
submitTaskAndWait(indexTaskFilePath, dataSource);
|
||||
try {
|
||||
sqlQueryHelper.testQueriesFromFile(queryFilePath, 2);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "Error while testing");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitTaskAndWait(String taskSpec, String dataSourceName)
|
||||
{
|
||||
final String taskID = indexer.submitTask(taskSpec);
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITSystemTableBatchIndexTaskTest extends AbstractITBatchIndexTest
|
||||
{
|
||||
|
||||
private static final Logger LOG = new Logger(ITCompactionTaskTest.class);
|
||||
private static String INDEX_TASK = "/indexer/wikipedia_index_task.json";
|
||||
private static String SYSTEM_QUERIES_RESOURCE = "/indexer/sys_segment_batch_index_queries.json";
|
||||
private static String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||
|
||||
@Test
|
||||
public void testIndexData() throws Exception
|
||||
{
|
||||
LOG.info("Starting batch index sys table queries");
|
||||
try (
|
||||
final Closeable indexCloseable = unloader(INDEX_DATASOURCE)
|
||||
) {
|
||||
doIndexTestSqlTest(
|
||||
INDEX_DATASOURCE,
|
||||
INDEX_TASK,
|
||||
SYSTEM_QUERIES_RESOURCE
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.query;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.testing.utils.RetryUtil;
|
||||
import org.apache.druid.testing.utils.SqlTestQueryHelper;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITSystemTableQueryTest
|
||||
{
|
||||
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
|
||||
private static final String TWITTER_DATA_SOURCE = "twitterstream";
|
||||
private static final String SYSTEM_QUERIES_RESOURCE = "/queries/sys_segment_queries.json";
|
||||
|
||||
@Inject
|
||||
CoordinatorResourceTestClient coordinatorClient;
|
||||
@Inject
|
||||
private SqlTestQueryHelper queryHelper;
|
||||
@Inject
|
||||
IntegrationTestingConfig config;
|
||||
|
||||
@BeforeMethod
|
||||
public void before()
|
||||
{
|
||||
// ensure that wikipedia segments are loaded completely
|
||||
RetryUtil.retryUntilTrue(
|
||||
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
|
||||
);
|
||||
|
||||
// ensure that the twitter segments are loaded completely
|
||||
RetryUtil.retryUntilTrue(
|
||||
() -> coordinatorClient.areSegmentsLoaded(TWITTER_DATA_SOURCE), "twitter segment load"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSystemTableQueries()
|
||||
{
|
||||
try {
|
||||
this.queryHelper.testQueriesFromFile(SYSTEM_QUERIES_RESOURCE, 2);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"query": {
|
||||
"query": "SELECT count(*) FROM sys.segments WHERE datasource='wikipedia_index_test'"
|
||||
},
|
||||
"expectedResults": [
|
||||
{
|
||||
"EXPR$0": 4
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"query": {
|
||||
"query": "SELECT server_type FROM sys.servers"
|
||||
},
|
||||
"expectedResults": [
|
||||
{
|
||||
"server_type": "historical"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"query": {
|
||||
"query": "SELECT status FROM sys.tasks"
|
||||
},
|
||||
"expectedResults": [
|
||||
{
|
||||
"status": "SUCCESS"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
|
@ -0,0 +1,31 @@
|
|||
[
|
||||
{
|
||||
"query": {
|
||||
"query": "SELECT datasource, count(*) FROM sys.segments GROUP BY 1"
|
||||
},
|
||||
"expectedResults": [
|
||||
{
|
||||
"datasource": "wikipedia_editstream",
|
||||
"EXPR$1": 1
|
||||
},
|
||||
{
|
||||
"datasource": "wikipedia",
|
||||
"EXPR$1": 1
|
||||
},
|
||||
{
|
||||
"datasource": "twitterstream",
|
||||
"EXPR$1": 3
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"query": {
|
||||
"query": "SELECT server_type FROM sys.servers"
|
||||
},
|
||||
"expectedResults": [
|
||||
{
|
||||
"server_type": "historical"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue