Add IT for MSQ task engine using the new IT framework (#12992)

* first test, serde causing problems

* serde working

* insert and select check

* Add cluster annotations for MSQ test cases

* Add cluster config for MSQ

* Add MSQ config to the pom.xml

* cleanup unnecessary changes

* Remove model classes

* Comments, checkstyle, check queries from file

* fixup test case name

* build failure fix

* review changes

* build failure fix

* Trigger Build

* Log the mismatch in QueryResultsVerifier

* Trigger Build

* Change the signature of the results verifier

* review changes

* LGTM fix

* build, change pom

* Trigger Build

* Trigger Build

* trigger build with minimal pom changes

* guice fix in tests

* travis.yml
This commit is contained in:
Laksh Singla 2022-09-22 16:09:47 +05:30 committed by GitHub
parent 044cab5094
commit 728745a1d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 815 additions and 20 deletions

View File

@ -688,6 +688,17 @@ jobs:
# the Druid services that did not exit normally.
script: ./it.sh travis HighAvailability
- &integration_tests_ex
name: "(Compile=openjdk8, Run=openjdk8) multi stage query tests"
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
# Uses the installation defined above. Then, builds the test tools and docker image,
# and runs one IT. If tests fail, echos log lines of any of
# the Druid services that did not exit normally.
script: ./it.sh travis MultiStageQuery
# Disabling BatchIndex test as it is failing with due to timeout, fixing it will be taken in a separate PR.
#- <<: *integration_tests_ex
# name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"

View File

@ -49,7 +49,7 @@ DRUID_INSTANCE=
# variables: druid_standard_loadList defined here, and druid_test_loadList, defined
# in a docker-compose.yaml file, for any test-specific extensions.
# See compose.md for more details.
druid_standard_loadList=mysql-metadata-storage,druid-it-tools,druid-lookups-cached-global,druid-histogram,druid-datasketches,druid-parquet-extensions,druid-avro-extensions,druid-protobuf-extensions,druid-orc-extensions,druid-kafka-indexing-service,druid-s3-extensions
druid_standard_loadList=mysql-metadata-storage,druid-it-tools,druid-lookups-cached-global,druid-histogram,druid-datasketches,druid-parquet-extensions,druid-avro-extensions,druid-protobuf-extensions,druid-orc-extensions,druid-kafka-indexing-service,druid-s3-extensions,druid-multi-stage-query
# Location of Hadoop dependencies provided at runtime in the shared directory.
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies

View File

@ -0,0 +1,95 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24
services:
zookeeper:
extends:
file: ../Common/dependencies.yaml
service: zookeeper
metadata:
extends:
file: ../Common/dependencies.yaml
service: metadata
coordinator:
extends:
file: ../Common/druid.yaml
service: coordinator
container_name: coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_manager_segments_pollDuration=PT5S
- druid_coordinator_period=PT10S
depends_on:
- zookeeper
- metadata
overlord:
extends:
file: ../Common/druid.yaml
service: overlord
container_name: overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
- metadata
broker:
extends:
file: ../Common/druid.yaml
service: broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
router:
extends:
file: ../Common/druid.yaml
service: router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
historical:
extends:
file: ../Common/druid.yaml
service: historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
indexer:
extends:
file: ../Common/druid.yaml
service: indexer
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
volumes:
# Test data
- ../../resources:/resources
depends_on:
- zookeeper

View File

@ -182,6 +182,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-multi-stage-query</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@ -288,6 +293,15 @@
<it.category>AzureDeepStorage</it.category>
</properties>
</profile>
<profile>
<id>IT-MultiStageQuery</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<it.category>MultiStageQuery</it.category>
</properties>
</profile>
<profile>
<id>docker-tests</id>
<activation>

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.categories;
public class MultiStageQuery
{
}

View File

@ -22,15 +22,18 @@ package org.apache.druid.testsEx.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import org.apache.druid.cli.GuiceRunnable;
import org.apache.druid.curator.CuratorModule;
import org.apache.druid.curator.discovery.DiscoveryModule;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.DruidProcessingConfigModule;
import org.apache.druid.guice.JsonConfigProvider;
@ -82,6 +85,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
/**
* The magic needed to piece together enough of Druid to allow clients to
@ -141,11 +145,16 @@ public class Initializer
.in(LazySingleton.class);
// Dummy DruidNode instance to make Guice happy. This instance is unused.
DruidNode dummy = new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false);
binder
.bind(DruidNode.class)
.annotatedWith(Self.class)
.toInstance(
new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false));
.toInstance(dummy);
// Required for MSQIndexingModule
binder.bind(new TypeLiteral<Set<NodeRole>>()
{
}).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
// Reduced form of SQLMetadataStorageDruidModule
String prop = SQLMetadataStorageDruidModule.PROPERTY;

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.msq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.categories.MultiStageQuery;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@RunWith(DruidTestRunner.class)
@Category(MultiStageQuery.class)
public class ITMultiStageQuery
{
@Inject
private MsqTestQueryHelper msqHelper;
@Inject
private SqlResourceTestClient msqClient;
@Inject
private IntegrationTestingConfig config;
@Inject
private ObjectMapper jsonMapper;
@Inject
private DataLoaderHelper dataLoaderHelper;
@Inject
private CoordinatorResourceTestClient coordinatorClient;
private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
@Test
public void testMsqIngestionAndQuerying() throws Exception
{
String datasource = "dst";
// Clear up the datasource from the previous runs
coordinatorClient.unloadSegmentsForDataSource(datasource);
String queryLocal =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " isRobot,\n"
+ " diffUrl,\n"
+ " added,\n"
+ " countryIsoCode,\n"
+ " regionName,\n"
+ " channel,\n"
+ " flags,\n"
+ " delta,\n"
+ " isUnpatrolled,\n"
+ " isNew,\n"
+ " deltaBucket,\n"
+ " isMinor,\n"
+ " isAnonymous,\n"
+ " deleted,\n"
+ " cityName,\n"
+ " metroCode,\n"
+ " namespace,\n"
+ " comment,\n"
+ " page,\n"
+ " commentLength,\n"
+ " countryName,\n"
+ " user,\n"
+ " regionIsoCode\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ " )\n"
+ ")\n"
+ "PARTITIONED BY DAY\n"
+ "CLUSTERED BY \"__time\"",
datasource
);
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(queryLocal);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}
}

View File

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#-------------------------------------------------------------------------
# Definition of the multi stage query test cluster.
# See https://yaml.org/spec/1.2.2 for more about YAML
include:
- /cluster/Common/zk-metastore.yaml
druid:
coordinator:
instances:
- port: 8081
overlord:
instances:
- port: 8090
broker:
instances:
- port: 8082
router:
instances:
- port: 8888
historical:
instances:
- port: 8083
indexer:
instances:
- port: 8091

View File

@ -0,0 +1,31 @@
[
{
"query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1377910953000,
"isRobot": null,
"added": 57,
"delta": -143,
"deleted": 200,
"namespace": "article"
},
{
"__time": 1377919965000,
"isRobot": null,
"added": 459,
"delta": 330,
"deleted": 129,
"namespace": "wikipedia"
},
{
"__time": 1377933081000,
"isRobot": null,
"added": 123,
"delta": 111,
"deleted": 12,
"namespace":"article"
}
]
}
]

View File

@ -239,6 +239,11 @@
<artifactId>simple-client-sslcontext</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-multi-stage-query</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>

View File

@ -61,7 +61,7 @@ public class OverlordResourceTestClient
private final String indexer;
@Inject
OverlordResourceTestClient(
protected OverlordResourceTestClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
@ -72,7 +72,7 @@ public class OverlordResourceTestClient
this.indexer = config.getOverlordUrl();
}
private String getIndexerURL()
protected String getIndexerURL()
{
return StringUtils.format(
"%s/druid/indexer/v1/",
@ -720,7 +720,7 @@ public class OverlordResourceTestClient
}
}
private StatusResponseHolder makeRequest(HttpMethod method, String url)
protected StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {
StatusResponseHolder response = this.httpClient

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.clients.msq;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.guice.TestClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
import java.util.Map;
/**
* Overlord resource client for MSQ Tasks
*/
public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
{
private ObjectMapper jsonMapper;
@Inject
MsqOverlordResourceTestClient(
@Json ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
super(jsonMapper, httpClient, config);
this.jsonMapper = jsonMapper;
this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
}
public Map<String, MSQTaskReport> getMsqTaskReport(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
StringUtils.format(
"%s%s",
getIndexerURL(),
StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
)
);
return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, MSQTaskReport>>()
{
});
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -21,10 +21,13 @@ package org.apache.druid.testing.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import org.apache.druid.curator.CuratorConfig;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.EscalatedClient;
@ -41,6 +44,8 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.IntegrationTestingConfigProvider;
import org.apache.druid.testing.IntegrationTestingCuratorConfig;
import java.util.Set;
/**
*/
public class DruidTestModule implements Module
@ -59,6 +64,11 @@ public class DruidTestModule implements Module
binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(
new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false)
);
// Required for MSQIndexingModule
binder.bind(new TypeLiteral<Set<NodeRole>>()
{
}).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
}
@Provides

View File

@ -146,24 +146,28 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
for (QueryResultType queryWithResult : queries) {
LOG.info("Running Query %s", queryWithResult.getQuery());
List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(),
queryWithResult.getFieldsToTest()
)) {
QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults(
result,
queryWithResult.getExpectedResults(),
queryWithResult.getFieldsToTest()
);
if (!resultsComparison.isSuccess()) {
LOG.error(
"Failed while executing query %s \n expectedResults: %s \n actualResults : %s",
queryWithResult.getQuery(),
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
jsonMapper.writeValueAsString(result)
);
failed = true;
throw new ISE(
"Results mismatch while executing the query %s.\n"
+ "Mismatch error: %s\n",
queryWithResult.getQuery(),
resultsComparison.getErrorMessage()
);
} else {
LOG.info("Results Verified for Query %s", queryWithResult.getQuery());
}
}
if (failed) {
throw new ISE("one or more queries failed");
}
}
@SuppressWarnings("unchecked")

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MsqQueryWithResults extends AbstractQueryWithResults<String>
{
@JsonCreator
public MsqQueryWithResults(
@JsonProperty("query") String query,
@JsonProperty("expectedResults") List<Map<String, Object>> expectedResults
)
{
super(query, expectedResults, Collections.emptyList());
}
}

View File

@ -0,0 +1,257 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Helper class to aid out ITs for MSQ.
* This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
* functions for the tests
*/
public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
{
private final ObjectMapper jsonMapper;
private final IntegrationTestingConfig config;
private final MsqOverlordResourceTestClient overlordClient;
private final SqlResourceTestClient msqClient;
@Inject
MsqTestQueryHelper(
final ObjectMapper jsonMapper,
final SqlResourceTestClient queryClient,
final IntegrationTestingConfig config,
final MsqOverlordResourceTestClient overlordClient,
final SqlResourceTestClient msqClient
)
{
super(jsonMapper, queryClient, config);
this.jsonMapper = jsonMapper;
this.config = config;
this.overlordClient = overlordClient;
this.msqClient = msqClient;
}
@Override
public String getQueryURL(String schemeAndHost)
{
return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
}
/**
* Submits a task to the MSQ API with the given query string, and default headers and parameters
*/
public SqlTaskStatus submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
{
return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
}
// Run the task, wait for it to complete, fetch the reports, verify the results,
/**
* Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
* and returns the status associated with the submitted task
*/
public SqlTaskStatus submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
{
String queryUrl = getQueryURL(config.getBrokerUrl());
Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
// It is okay to block here for the result because MSQ tasks return the task id associated with it, which shouldn't
// consume a lot of time
StatusResponseHolder statusResponseHolder;
try {
statusResponseHolder = responseHolderFuture.get(5, TimeUnit.MINUTES);
}
catch (TimeoutException e) {
throw new ISE(e, "Unable to fetch the task id for the submitted task in time.");
}
// Check if the task has been accepted successfully
HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
throw new ISE(
"Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
httpResponseStatus,
statusResponseHolder.getContent()
);
}
String content = statusResponseHolder.getContent();
SqlTaskStatus sqlTaskStatus;
try {
sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
}
catch (JsonProcessingException e) {
throw new ISE("Unable to parse the response");
}
return sqlTaskStatus;
}
/**
* The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
* returns the last fetched state {@link TaskState} of the task
*/
public TaskState pollTaskIdForCompletion(String taskId) throws Exception
{
return RetryUtils.retry(
() -> {
TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
TaskState statusCode = taskStatusPlus.getStatusCode();
if (statusCode != null && statusCode.isComplete()) {
return taskStatusPlus.getStatusCode();
}
throw new TaskStillRunningException();
},
(Throwable t) -> t instanceof TaskStillRunningException,
100
);
}
/**
* Fetches status reports for a given task
*/
public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
{
return overlordClient.getMsqTaskReport(taskId);
}
/**
* Compares the results for a given taskId. It is required that the task has produced some results that can be verified
*/
private void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults)
{
Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
if (taskReport == null) {
throw new ISE("Unable to fetch the status report for the task [%]", taskId);
}
MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
taskReport.getPayload(),
"payload"
);
MSQResultsReport resultsReport = Preconditions.checkNotNull(
taskReportPayload.getResults(),
"Results report for the task id is empty"
);
List<Map<String, Object>> actualResults = new ArrayList<>();
Yielder<Object[]> yielder = resultsReport.getResultYielder();
RowSignature rowSignature = resultsReport.getSignature();
while (!yielder.isDone()) {
Object[] row = yielder.get();
Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
for (int i = 0; i < row.length; ++i) {
rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
}
actualResults.add(rowWithFieldNames);
yielder = yielder.next(null);
}
QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults(
actualResults,
expectedQueryWithResults.getExpectedResults(),
Collections.emptyList()
);
if (!resultsComparison.isSuccess()) {
throw new IAE(
"Expected query result is different from the actual result.\n"
+ "Query: %s\n"
+ "Actual Result: %s\n"
+ "Expected Result: %s\n"
+ "Mismatch Error: %s\n",
expectedQueryWithResults.getQuery(),
actualResults,
expectedQueryWithResults.getExpectedResults(),
resultsComparison.getErrorMessage()
);
}
}
/**
* Runs queries from files using MSQ and compares the results with the ones provided
*/
@Override
public void testQueriesFromFile(String filePath, String fullDatasourcePath) throws Exception
{
LOG.info("Starting query tests for [%s]", filePath);
List<MsqQueryWithResults> queries =
jsonMapper.readValue(
TestQueryHelper.class.getResourceAsStream(filePath),
new TypeReference<List<MsqQueryWithResults>>()
{
}
);
for (MsqQueryWithResults queryWithResults : queries) {
String queryString = queryWithResults.getQuery();
String queryWithDatasource = StringUtils.replace(queryString, "%%DATASOURCE%%", fullDatasourcePath);
SqlTaskStatus sqlTaskStatus = submitMsqTask(queryWithDatasource);
if (sqlTaskStatus.getState().isFailure()) {
throw new ISE(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
);
}
String taskId = sqlTaskStatus.getTaskId();
pollTaskIdForCompletion(taskId);
compareResults(taskId, queryWithResults);
}
}
private static class TaskStillRunningException extends Exception
{
}
}

View File

@ -19,13 +19,21 @@
package org.apache.druid.testing.utils;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class QueryResultVerifier
{
public static boolean compareResults(
/**
* Tests the actual vs the expected results for equality and returns a {@link ResultVerificationObject} containing the
* result of the check. If fieldsToTest is not null and non empty, only the supplied fields would be tested for
* equality. Else, the whole row is compared
*/
public static ResultVerificationObject compareResults(
Iterable<Map<String, Object>> actual,
Iterable<Map<String, Object>> expected,
List<String> fieldsToTest
@ -34,6 +42,7 @@ public class QueryResultVerifier
Iterator<Map<String, Object>> actualIter = actual.iterator();
Iterator<Map<String, Object>> expectedIter = expected.iterator();
int rowNumber = 1;
while (actualIter.hasNext() && expectedIter.hasNext()) {
Map<String, Object> actualRes = actualIter.next();
Map<String, Object> expRes = expectedIter.next();
@ -41,19 +50,60 @@ public class QueryResultVerifier
if (fieldsToTest != null && !fieldsToTest.isEmpty()) {
for (String field : fieldsToTest) {
if (!actualRes.get(field).equals(expRes.get(field))) {
return false;
String mismatchMessage = StringUtils.format(
"Mismatch in row no. [%d], column [%s]. Expected: [%s], Actual: [%s]",
rowNumber,
field,
expRes,
actualRes
);
return new ResultVerificationObject(mismatchMessage);
}
}
} else {
if (!actualRes.equals(expRes)) {
return false;
String mismatchMessage = StringUtils.format(
"Mismatch in row no. [%d]. Expected: [%s], Actual: [%s]",
rowNumber,
expRes,
actualRes
);
return new ResultVerificationObject(mismatchMessage);
}
}
++rowNumber;
}
if (actualIter.hasNext() || expectedIter.hasNext()) {
return false;
String mismatchMessage =
StringUtils.format(
"Results size mismatch. The actual result contain %s rows than the expected result.",
actualIter.hasNext() ? "more" : "less"
);
return new ResultVerificationObject(mismatchMessage);
}
return new ResultVerificationObject(null);
}
public static class ResultVerificationObject
{
@Nullable
private final String errorMessage;
ResultVerificationObject(@Nullable final String errorMessage)
{
this.errorMessage = errorMessage;
}
public boolean isSuccess()
{
return getErrorMessage() == null;
}
@Nullable
public String getErrorMessage()
{
return errorMessage;
}
return true;
}
}

View File

@ -164,7 +164,7 @@ public class ITQueryRetryTestOnMissingSegments
result,
queryWithResult.getExpectedResults(),
queryWithResult.getFieldsToTest()
)) {
).isSuccess()) {
if (expectation != Expectation.INCORRECT_RESULT) {
throw new ISE(
"Incorrect query results for query %s \n expectedResults: %s \n actualResults : %s",