Revised IT to detect backward incompatible change (#16779)

Added a new revised IT group BackwardCompatibilityMain. The idea is to catch potential backward compatibility issues that may arise during rolling upgrade.

This test group runs a docker-compose cluster with Overlord & Coordinator service on the previous druid version.

Following env vars are required in the GHA file .github/workflows/unit-and-integration-tests-unified.yml to run this test

DRUID_PREVIOUS_VERSION -> Previous druid version to test backward incompatibility.
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL -> URL to fetch the tar.
This commit is contained in:
Rishabh Singh 2024-08-07 11:13:35 +05:30 committed by GitHub
parent 83cf4dc554
commit 99313e9996
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1498 additions and 786 deletions

View File

@ -57,6 +57,19 @@ on:
AWS_SECRET_ACCESS_KEY:
required: false
type: string
BACKWARD_COMPATIBILITY_IT_ENABLED:
required: false
type: string
default: false
DRUID_PREVIOUS_VERSION:
required: false
type: string
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
required: false
type: string
DRUID_PREVIOUS_IT_IMAGE_NAME:
required: false
type: string
env:
MYSQL_DRIVER_CLASSNAME: ${{ inputs.mysql_driver }} # Used by tests to connect to metadata store directly.
@ -106,6 +119,15 @@ jobs:
./druid-container-jdk${{ inputs.build_jdk }}.tar.gz
./integration-tests-ex/image/target/env.sh
- name: Retrieve previous version cached docker image
id: docker-restore-previous-version
if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
uses: actions/cache/restore@v4
with:
key: druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
path: |
./druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
- name: Maven build
if: steps.maven-restore.outputs.cache-hit != 'true' || ( steps.docker-restore.outputs.cache-hit != 'true' && steps.targets-restore.outputs.cache-hit != 'true' )
run: |
@ -115,6 +137,10 @@ jobs:
if: steps.docker-restore.outputs.cache-hit != 'true' || steps.maven-restore.outputs.cache-hit != 'true'
env:
docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
./it.sh image
source ./integration-tests-ex/image/target/env.sh
@ -122,6 +148,15 @@ jobs:
echo $DRUID_IT_IMAGE_NAME
docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{ inputs.build_jdk }}.tar.gz
- name: Save previous version docker image
if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' && (steps.docker-restore.outputs.cache-hit != 'true' || steps.maven-restore.outputs.cache-hit != 'true') }}
env:
docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
run: |
docker tag ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}
echo ${DRUID_PREVIOUS_IT_IMAGE_NAME}
docker save "${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip > druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
- name: Stop and remove docker containers
run: |
echo "Force stopping all containers and pruning"
@ -133,9 +168,21 @@ jobs:
docker load --input druid-container-jdk${{ inputs.build_jdk }}.tar.gz
docker images
- name: Load previous version docker image
if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
run: |
docker load --input druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
docker images
- name: Run IT
id: run-it
run: ${{ inputs.script }}
env:
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
${{ inputs.script }}
- name: Collect docker logs on failure
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}

View File

@ -18,6 +18,24 @@
name: "Revised ITs workflow"
on:
workflow_call:
inputs:
BACKWARD_COMPATIBILITY_IT_ENABLED:
description: "Flag for backward compatibility IT"
required: false
default: false
type: string
DRUID_PREVIOUS_VERSION:
description: "Previous druid versions to run the test against."
required: false
type: string
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
description: "URL to download the previous druid version."
required: false
type: string
DRUID_PREVIOUS_IT_IMAGE_NAME:
description: "Druid previous version image name."
required: false
type: string
workflow_dispatch:
jobs:
@ -79,3 +97,24 @@ jobs:
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: miniopassword
backward-compatibility-it:
needs: changes
uses: ./.github/workflows/reusable-revised-its.yml
if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' && (needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true') }}
with:
build_jdk: 8
runtime_jdk: 8
use_indexer: middleManager
script: ./it.sh github BackwardCompatibilityMain
it: BackwardCompatibilityMain
mysql_driver: com.mysql.jdbc.Driver
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
DRUID_CLOUD_BUCKET: druid-qa
DRUID_CLOUD_PATH: aws-${{ github.run_id }}-${{ github.run_attempt }}
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: miniopassword

View File

@ -49,7 +49,30 @@ env:
SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5
jobs:
set-env-var:
name: Set env var
runs-on: ubuntu-latest
outputs:
DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ steps.image_name.outputs.image_name }}
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ steps.it_enabled.outputs.enabled }}
DRUID_PREVIOUS_VERSION: ${{ env.DRUID_PREVIOUS_VERSION }}
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ env.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
steps:
- name: Set image name env var
id: image_name
run: |
echo "::set-output name=image_name::org.apache.druid.integration-tests/test:${{ env.DRUID_PREVIOUS_VERSION }}"
- name: Set env for enabling backward compatibility it
id: it_enabled
run: |
if [ -n "${{ env.DRUID_PREVIOUS_VERSION }}" ]; then
echo "::set-output name=enabled::true"
else
echo "::set-output name=enabled::false"
fi
build:
needs: set-env-var
name: "build (jdk${{ matrix.jdk }})"
strategy:
fail-fast: false
@ -94,12 +117,25 @@ jobs:
./druid-container-jdk${{ matrix.jdk }}.tar.gz
./integration-tests-ex/image/target/env.sh
- name: Cache previous version image
id: docker_container_previous_version
uses: actions/cache@v4
with:
key: druid-container-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
path: |
./druid-container-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
- name: Maven build
id: maven_build
run: |
./it.sh ci
- name: Container build
env:
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
DRUID_PREVIOUS_VERSION: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
./it.sh image
source ./integration-tests-ex/image/target/env.sh
@ -111,6 +147,13 @@ jobs:
echo $DRUID_IT_IMAGE_NAME
docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{ matrix.jdk }}.tar.gz
- name: Save previous version docker image
if: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
run: |
docker tag ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
echo ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
docker save "${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip > druid-container-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
unit-tests-phase2:
strategy:
fail-fast: false
@ -142,6 +185,11 @@ jobs:
uses: ./.github/workflows/standard-its.yml
revised-its:
needs: unit-tests
needs: [unit-tests, set-env-var]
if: ${{ always() && (needs.unit-tests.result == 'success' || needs.unit-tests.outputs.continue_tests) }}
uses: ./.github/workflows/revised-its.yml
with:
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
DRUID_PREVIOUS_VERSION: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}

View File

@ -0,0 +1,111 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24
services:
zookeeper:
extends:
file: ../Common/dependencies.yaml
service: zookeeper
metadata:
extends:
file: ../Common/dependencies.yaml
service: metadata
coordinator:
extends:
file: ../Common/druid.yaml
service: coordinator
image: ${DRUID_PREVIOUS_IT_IMAGE_NAME}
container_name: coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
# The frequency with which the coordinator polls the database
# for changes. The DB population code has to wait at least this
# long for the coordinator to notice changes.
- druid_manager_segments_pollDuration=PT5S
- druid_coordinator_period=PT10S
depends_on:
- zookeeper
- metadata
overlord:
extends:
file: ../Common/druid.yaml
service: overlord
image: ${DRUID_PREVIOUS_IT_IMAGE_NAME}
container_name: overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
- metadata
broker:
extends:
file: ../Common/druid.yaml
service: broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
router:
extends:
file: ../Common/druid.yaml
service: router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
historical:
extends:
file: ../Common/druid.yaml
service: historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
middlemanager:
extends:
file: ../Common/druid.yaml
service: middlemanager
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_msq_intermediate_storage_enable=true
- druid_msq_intermediate_storage_type=local
- druid_msq_intermediate_storage_basePath=/shared/durablestorage/
- druid_export_storage_baseDir=/
volumes:
# Test data
- ../../resources:/resources
depends_on:
- zookeeper
kafka:
extends:
file: ../Common/dependencies.yaml
service: kafka
depends_on:
- zookeeper

View File

@ -459,6 +459,15 @@
<it.category>GcsDeepStorage</it.category>
</properties>
</profile>
<profile>
<id>IT-BackwardCompatibilityMain</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<it.category>BackwardCompatibilityMain</it.category>
</properties>
</profile>
<profile>
<id>docker-tests</id>
<activation>

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.BackwardCompatibilityMain;
import org.apache.druid.testsEx.categories.BackwardCompatibilityMain;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.apache.druid.testsEx.indexer.IndexerTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@RunWith(DruidTestRunner.class)
@Category(BackwardCompatibilityMain.class)
public class ITBCMainIndexerTest extends IndexerTest
{
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.BackwardCompatibilityMain;
import org.apache.druid.testsEx.categories.BackwardCompatibilityMain;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.apache.druid.testsEx.msq.MultiStageQuery;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@RunWith(DruidTestRunner.class)
@Category(BackwardCompatibilityMain.class)
public class ITBCMainMultiStageQuery extends MultiStageQuery
{
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.BackwardCompatibilityMain;
import org.apache.druid.testsEx.categories.BackwardCompatibilityMain;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.apache.druid.testsEx.query.UnionQueryTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@RunWith(DruidTestRunner.class)
@Category(BackwardCompatibilityMain.class)
public class ITBCMainUnionQueryTest extends UnionQueryTest
{
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.categories;
public class BackwardCompatibilityMain
{
}

View File

@ -19,369 +19,13 @@
package org.apache.druid.testsEx.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testsEx.categories.BatchIndex;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@RunWith(DruidTestRunner.class)
@Category(BatchIndex.class)
public class ITIndexerTest extends AbstractITBatchIndexTest
public class ITIndexerTest extends IndexerTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String INDEX_WITH_TIMESTAMP_TASK = "/indexer/wikipedia_with_timestamp_index_task.json";
// TODO: add queries that validate timestamp is different from the __time column since it is a dimension
// TODO: https://github.com/apache/druid/issues/9565
private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = "wikipedia_with_timestamp_index_test";
private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task.json";
private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json";
private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test";
private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json";
private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json";
private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test";
private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json";
private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test";
private static final String GET_LOCKED_INTERVALS = "wikipedia_index_get_locked_intervals_test";
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
CoordinatorDynamicConfig.builder().build();
@Inject
CoordinatorResourceTestClient coordinatorClient;
@Test
public void testIndexData() throws Exception
{
final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
final Function<String, String> transform = spec -> {
try {
return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
transform,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
}
}
@Test
public void testReIndexDataWithTimestamp() throws Exception
{
final String reindexDatasource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp";
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
INDEX_WITH_TIMESTAMP_TASK,
INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
doReindexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
doReindexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
}
}
@Test
public void testReIndexWithNonExistingDatasource() throws Exception
{
Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
final String fullBaseDatasourceName = "nonExistingDatasource2904";
final String fullReindexDatasourceName = "newDatasource123";
String taskSpec = StringUtils.replace(
getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
"%%DATASOURCE%%",
fullBaseDatasourceName
);
taskSpec = StringUtils.replace(
taskSpec,
"%%REINDEX_DATASOURCE%%",
fullReindexDatasourceName
);
// This method will also verify task is successful after task finish running
// We expect task to be successful even if the datasource to reindex does not exist
submitTaskAndWait(
taskSpec,
fullReindexDatasourceName,
false,
false,
dummyPair
);
}
@Test
public void testMERGEIndexData() throws Exception
{
final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";
final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
MERGE_INDEX_DATASOURCE,
MERGE_INDEX_TASK,
MERGE_INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
doReindexTest(
MERGE_INDEX_DATASOURCE,
reindexDatasource,
MERGE_REINDEX_TASK,
MERGE_REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
doReindexTest(
MERGE_INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
MERGE_INDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
}
}
/**
* Test that task reports indicate the ingested segments were loaded before the configured timeout expired.
*
* @throws Exception
*/
@Test
public void testIndexDataAwaitSegmentAvailability() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
final Function<String, String> transform = spec -> {
try {
return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("600000")
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
transform,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(true, true)
);
}
}
/**
* Test that the task still succeeds if the segments do not become available before the configured wait timeout
* expires.
*
* @throws Exception
*/
@Test
public void testIndexDataAwaitSegmentAvailabilityFailsButTaskSucceeds() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
final Function<String, String> transform = spec -> {
try {
return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("1")
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
transform,
INDEX_QUERIES_RESOURCE,
false,
false,
false,
new Pair<>(true, false)
);
coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
);
}
}
@Test
public void testIndexWithMergeColumnLimitData() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
doIndexTest(
INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE,
INDEX_WITH_MERGE_COLUMN_LIMIT_TASK,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
}
}
@Test
public void testGetLockedIntervals() throws Exception
{
final String datasourceName = GET_LOCKED_INTERVALS + config.getExtraDatasourceNameSuffix();
try (final Closeable ignored = unloader(datasourceName)) {
// Submit an Indexing Task
submitIndexTask(INDEX_TASK, datasourceName);
// Wait until it acquires a lock
final List<LockFilterPolicy> lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
ITRetryUtil.retryUntilFalse(
() -> {
lockedIntervals.clear();
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
return lockedIntervals.isEmpty();
},
"Verify Intervals are Locked"
);
// Verify the locked intervals for this datasource
Assert.assertEquals(lockedIntervals.size(), 1);
Assert.assertEquals(
lockedIntervals.get(datasourceName),
Collections.singletonList(Intervals.of("2013-08-31/2013-09-02"))
);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(datasourceName),
"Segment Load"
);
}
}
@Test
public void testJsonFunctions() throws Exception
{
final String taskSpec = getResourceAsString("/indexer/json_path_index_task.json");
submitTaskAndWait(
taskSpec,
"json_path_index_test",
false,
true,
new Pair<>(false, false)
);
doTestQuery("json_path_index_test", "/indexer/json_path_index_queries.json");
}
}

View File

@ -0,0 +1,381 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class IndexerTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String INDEX_WITH_TIMESTAMP_TASK = "/indexer/wikipedia_with_timestamp_index_task.json";
// TODO: add queries that validate timestamp is different from the __time column since it is a dimension
// TODO: https://github.com/apache/druid/issues/9565
private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = "wikipedia_with_timestamp_index_test";
private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task.json";
private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json";
private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test";
private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json";
private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json";
private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test";
private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json";
private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test";
private static final String GET_LOCKED_INTERVALS = "wikipedia_index_get_locked_intervals_test";
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
CoordinatorDynamicConfig.builder().build();
@Inject
CoordinatorResourceTestClient coordinatorClient;
@Test
public void testIndexData() throws Exception
{
final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
final Function<String, String> transform = spec -> {
try {
return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
transform,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
}
}
@Test
public void testReIndexDataWithTimestamp() throws Exception
{
final String reindexDatasource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp";
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
INDEX_WITH_TIMESTAMP_TASK,
INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
doReindexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
doReindexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
}
}
@Test
public void testReIndexWithNonExistingDatasource() throws Exception
{
Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
final String fullBaseDatasourceName = "nonExistingDatasource2904";
final String fullReindexDatasourceName = "newDatasource123";
String taskSpec = StringUtils.replace(
getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
"%%DATASOURCE%%",
fullBaseDatasourceName
);
taskSpec = StringUtils.replace(
taskSpec,
"%%REINDEX_DATASOURCE%%",
fullReindexDatasourceName
);
// This method will also verify task is successful after task finish running
// We expect task to be successful even if the datasource to reindex does not exist
submitTaskAndWait(
taskSpec,
fullReindexDatasourceName,
false,
false,
dummyPair
);
}
@Test
public void testMERGEIndexData() throws Exception
{
final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";
final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
MERGE_INDEX_DATASOURCE,
MERGE_INDEX_TASK,
MERGE_INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
doReindexTest(
MERGE_INDEX_DATASOURCE,
reindexDatasource,
MERGE_REINDEX_TASK,
MERGE_REINDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
doReindexTest(
MERGE_INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
MERGE_INDEX_QUERIES_RESOURCE,
new Pair<>(false, false)
);
}
}
/**
* Test that task reports indicate the ingested segments were loaded before the configured timeout expired.
*
* @throws Exception
*/
@Test
public void testIndexDataAwaitSegmentAvailability() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
final Function<String, String> transform = spec -> {
try {
return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("600000")
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
transform,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(true, true)
);
}
}
/**
* Test that the task still succeeds if the segments do not become available before the configured wait timeout
* expires.
*
* @throws Exception
*/
@Test
public void testIndexDataAwaitSegmentAvailabilityFailsButTaskSucceeds() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
final Function<String, String> transform = spec -> {
try {
return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("1")
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK,
transform,
INDEX_QUERIES_RESOURCE,
false,
false,
false,
new Pair<>(true, false)
);
coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
);
}
}
@Test
public void testIndexWithMergeColumnLimitData() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
doIndexTest(
INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE,
INDEX_WITH_MERGE_COLUMN_LIMIT_TASK,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
}
}
@Test
public void testGetLockedIntervals() throws Exception
{
final String datasourceName = GET_LOCKED_INTERVALS + config.getExtraDatasourceNameSuffix();
try (final Closeable ignored = unloader(datasourceName)) {
// Submit an Indexing Task
submitIndexTask(INDEX_TASK, datasourceName);
// Wait until it acquires a lock
final List<LockFilterPolicy> lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
ITRetryUtil.retryUntilFalse(
() -> {
lockedIntervals.clear();
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
return lockedIntervals.isEmpty();
},
"Verify Intervals are Locked"
);
// Verify the locked intervals for this datasource
Assert.assertEquals(lockedIntervals.size(), 1);
Assert.assertEquals(
lockedIntervals.get(datasourceName),
Collections.singletonList(Intervals.of("2013-08-31/2013-09-02"))
);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(datasourceName),
"Segment Load"
);
}
}
@Test
public void testJsonFunctions() throws Exception
{
final String taskSpec = getResourceAsString("/indexer/json_path_index_task.json");
submitTaskAndWait(
taskSpec,
"json_path_index_test",
false,
true,
new Pair<>(false, false)
);
doTestQuery("json_path_index_test", "/indexer/json_path_index_queries.json");
}
}

View File

@ -19,254 +19,12 @@
package org.apache.druid.testsEx.msq;
import com.google.api.client.util.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.storage.local.LocalFileExportStorageProvider;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.categories.MultiStageQuery;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@RunWith(DruidTestRunner.class)
@Category(MultiStageQuery.class)
public class ITMultiStageQuery
public class ITMultiStageQuery extends MultiStageQuery
{
@Inject
private MsqTestQueryHelper msqHelper;
@Inject
private DataLoaderHelper dataLoaderHelper;
@Inject
private CoordinatorResourceTestClient coordinatorClient;
private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
@Test
public void testMsqIngestionAndQuerying() throws Exception
{
String datasource = "dst";
// Clear up the datasource from the previous runs
coordinatorClient.unloadSegmentsForDataSource(datasource);
String queryLocal =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " isRobot,\n"
+ " diffUrl,\n"
+ " added,\n"
+ " countryIsoCode,\n"
+ " regionName,\n"
+ " channel,\n"
+ " flags,\n"
+ " delta,\n"
+ " isUnpatrolled,\n"
+ " isNew,\n"
+ " deltaBucket,\n"
+ " isMinor,\n"
+ " isAnonymous,\n"
+ " deleted,\n"
+ " cityName,\n"
+ " metroCode,\n"
+ " namespace,\n"
+ " comment,\n"
+ " page,\n"
+ " commentLength,\n"
+ " countryName,\n"
+ " user,\n"
+ " regionIsoCode\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ " )\n"
+ ")\n"
+ "PARTITIONED BY DAY\n",
datasource
);
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}
@Test
@Ignore("localfiles() is disabled")
public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
{
String datasource = "dst";
// Clear up the datasource from the previous runs
coordinatorClient.unloadSegmentsForDataSource(datasource);
String queryLocal =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " isRobot,\n"
+ " diffUrl,\n"
+ " added,\n"
+ " countryIsoCode,\n"
+ " regionName,\n"
+ " channel,\n"
+ " flags,\n"
+ " delta,\n"
+ " isUnpatrolled,\n"
+ " isNew,\n"
+ " deltaBucket,\n"
+ " isMinor,\n"
+ " isAnonymous,\n"
+ " deleted,\n"
+ " cityName,\n"
+ " metroCode,\n"
+ " namespace,\n"
+ " comment,\n"
+ " page,\n"
+ " commentLength,\n"
+ " countryName,\n"
+ " user,\n"
+ " regionIsoCode\n"
+ "FROM TABLE(\n"
+ " LOCALFILES(\n"
+ " files => ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
+ " format => 'json'\n"
+ " ))\n"
+ " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
+ " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
+ " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
+ " comment VARCHAR, page VARCHAR, commentLength BIGINT, countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
datasource
);
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}
@Test
public void testExport() throws Exception
{
String exportQuery =
StringUtils.format(
"INSERT INTO extern(%s(exportPath => '%s'))\n"
+ "AS CSV\n"
+ "SELECT page, added, delta\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ " )\n"
+ ")\n",
LocalFileExportStorageProvider.TYPE_NAME, "/shared/export/"
);
SqlTaskStatus exportTask = msqHelper.submitMsqTaskSuccesfully(exportQuery);
msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
if (exportTask.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
exportTask.getError()
));
}
String resultQuery = StringUtils.format(
"SELECT page, delta, added\n"
+ " FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"baseDir\":\"/shared/export/\",\"filter\":\"*.csv\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+ " )\n"
+ " ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
+ " WHERE delta != 0\n"
+ " ORDER BY page");
SqlTaskStatus resultTaskStatus = msqHelper.submitMsqTaskSuccesfully(resultQuery);
msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
TaskReport.ReportMap statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
MSQTaskReport taskReport = (MSQTaskReport) statusReport.get(MSQTaskReport.REPORT_KEY);
if (taskReport == null) {
throw new ISE("Unable to fetch the status report for the task [%]", resultTaskStatus.getTaskId());
}
TaskContextReport taskContextReport = (TaskContextReport) statusReport.get(TaskContextReport.REPORT_KEY);
Assert.assertFalse(taskContextReport.getPayload().isEmpty());
MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
taskReport.getPayload(),
"payload"
);
MSQResultsReport resultsReport = Preconditions.checkNotNull(
taskReportPayload.getResults(),
"Results report for the task id is empty"
);
List<List<Object>> actualResults = new ArrayList<>();
for (final Object[] row : resultsReport.getResults()) {
actualResults.add(Arrays.asList(row));
}
ImmutableList<ImmutableList<Object>> expectedResults = ImmutableList.of(
ImmutableList.of("Cherno Alpha", 111, 123),
ImmutableList.of("Gypsy Danger", -143, 57),
ImmutableList.of("Striker Eureka", 330, 459)
);
Assert.assertEquals(
expectedResults,
actualResults
);
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.msq;
import com.google.api.client.util.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.storage.local.LocalFileExportStorageProvider;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class MultiStageQuery
{
@Inject
private MsqTestQueryHelper msqHelper;
@Inject
private DataLoaderHelper dataLoaderHelper;
@Inject
private CoordinatorResourceTestClient coordinatorClient;
private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
@Test
public void testMsqIngestionAndQuerying() throws Exception
{
String datasource = "dst";
// Clear up the datasource from the previous runs
coordinatorClient.unloadSegmentsForDataSource(datasource);
String queryLocal =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " isRobot,\n"
+ " diffUrl,\n"
+ " added,\n"
+ " countryIsoCode,\n"
+ " regionName,\n"
+ " channel,\n"
+ " flags,\n"
+ " delta,\n"
+ " isUnpatrolled,\n"
+ " isNew,\n"
+ " deltaBucket,\n"
+ " isMinor,\n"
+ " isAnonymous,\n"
+ " deleted,\n"
+ " cityName,\n"
+ " metroCode,\n"
+ " namespace,\n"
+ " comment,\n"
+ " page,\n"
+ " commentLength,\n"
+ " countryName,\n"
+ " user,\n"
+ " regionIsoCode\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ " )\n"
+ ")\n"
+ "PARTITIONED BY DAY\n",
datasource
);
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}
@Test
@Ignore("localfiles() is disabled")
public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
{
String datasource = "dst";
// Clear up the datasource from the previous runs
coordinatorClient.unloadSegmentsForDataSource(datasource);
String queryLocal =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " isRobot,\n"
+ " diffUrl,\n"
+ " added,\n"
+ " countryIsoCode,\n"
+ " regionName,\n"
+ " channel,\n"
+ " flags,\n"
+ " delta,\n"
+ " isUnpatrolled,\n"
+ " isNew,\n"
+ " deltaBucket,\n"
+ " isMinor,\n"
+ " isAnonymous,\n"
+ " deleted,\n"
+ " cityName,\n"
+ " metroCode,\n"
+ " namespace,\n"
+ " comment,\n"
+ " page,\n"
+ " commentLength,\n"
+ " countryName,\n"
+ " user,\n"
+ " regionIsoCode\n"
+ "FROM TABLE(\n"
+ " LOCALFILES(\n"
+ " files => ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
+ " format => 'json'\n"
+ " ))\n"
+ " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
+ " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
+ " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
+ " comment VARCHAR, page VARCHAR, commentLength BIGINT, countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
datasource
);
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}
@Test
public void testExport() throws Exception
{
String exportQuery =
StringUtils.format(
"INSERT INTO extern(%s(exportPath => '%s'))\n"
+ "AS CSV\n"
+ "SELECT page, added, delta\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ " )\n"
+ ")\n",
LocalFileExportStorageProvider.TYPE_NAME, "/shared/export/"
);
SqlTaskStatus exportTask = msqHelper.submitMsqTaskSuccesfully(exportQuery);
msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
if (exportTask.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
exportTask.getError()
));
}
String resultQuery = StringUtils.format(
"SELECT page, delta, added\n"
+ " FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"baseDir\":\"/shared/export/\",\"filter\":\"*.csv\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+ " )\n"
+ " ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
+ " WHERE delta != 0\n"
+ " ORDER BY page");
SqlTaskStatus resultTaskStatus = msqHelper.submitMsqTaskSuccesfully(resultQuery);
msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
TaskReport.ReportMap statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
MSQTaskReport taskReport = (MSQTaskReport) statusReport.get(MSQTaskReport.REPORT_KEY);
if (taskReport == null) {
throw new ISE("Unable to fetch the status report for the task [%]", resultTaskStatus.getTaskId());
}
TaskContextReport taskContextReport = (TaskContextReport) statusReport.get(TaskContextReport.REPORT_KEY);
Assert.assertFalse(taskContextReport.getPayload().isEmpty());
MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
taskReport.getPayload(),
"payload"
);
MSQResultsReport resultsReport = Preconditions.checkNotNull(
taskReportPayload.getResults(),
"Results report for the task id is empty"
);
List<List<Object>> actualResults = new ArrayList<>();
for (final Object[] row : resultsReport.getResults()) {
actualResults.add(Arrays.asList(row));
}
ImmutableList<ImmutableList<Object>> expectedResults = ImmutableList.of(
ImmutableList.of("Cherno Alpha", 111, 123),
ImmutableList.of("Gypsy Danger", -143, 57),
ImmutableList.of("Striker Eureka", 330, 459)
);
Assert.assertEquals(
expectedResults,
actualResults
);
}
}

View File

@ -19,195 +19,13 @@
package org.apache.druid.testsEx.query;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.KafkaAdminClient;
import org.apache.druid.testing.utils.KafkaEventWriter;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testsEx.categories.Query;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
@RunWith(DruidTestRunner.class)
@Category(Query.class)
public class ITUnionQueryTest extends AbstractIndexerTest
public class ITUnionQueryTest extends UnionQueryTest
{
private static final Logger LOG = new Logger(ITUnionQueryTest.class);
private static final String UNION_SUPERVISOR_TEMPLATE = "/query/union_kafka_supervisor_template.json";
private static final String UNION_DATA_FILE = "/query/union_data.json";
private static final String UNION_QUERIES_RESOURCE = "/query/union_queries.json";
private static final String UNION_DATASOURCE = "wikipedia_index_test";
private String fullDatasourceName;
@Test
public void testUnionQuery() throws Exception
{
fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
final String baseName = fullDatasourceName + UUID.randomUUID();
KafkaAdminClient streamAdminClient = new KafkaAdminClient(config);
List<String> supervisors = new ArrayList<>();
final int numDatasources = 3;
for (int i = 0; i < numDatasources; i++) {
String datasource = baseName + "-" + i;
streamAdminClient.createStream(datasource, 1, Collections.emptyMap());
ITRetryUtil.retryUntil(
() -> streamAdminClient.isStreamActive(datasource),
true,
10000,
30,
"Wait for stream active"
);
String supervisorSpec = generateStreamIngestionPropsTransform(
datasource,
datasource,
config
).apply(getResourceAsString(UNION_SUPERVISOR_TEMPLATE));
LOG.info("supervisorSpec: [%s]\n", supervisorSpec);
// Start supervisor
String specResponse = indexer.submitSupervisor(supervisorSpec);
LOG.info("Submitted supervisor [%s]", specResponse);
supervisors.add(specResponse);
int ctr = 0;
try (
StreamEventWriter streamEventWriter = new KafkaEventWriter(config, false);
BufferedReader reader = new BufferedReader(
new InputStreamReader(getResourceAsStream(UNION_DATA_FILE), StandardCharsets.UTF_8)
)
) {
String line;
while ((line = reader.readLine()) != null) {
streamEventWriter.write(datasource, StringUtils.toUtf8(line));
ctr++;
}
}
final int numWritten = ctr;
LOG.info("Waiting for stream indexing tasks to consume events");
ITRetryUtil.retryUntilTrue(
() ->
numWritten == this.queryHelper.countRows(
datasource,
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
StringUtils.format(
"dataSource[%s] consumed [%,d] events, expected [%,d]",
datasource,
this.queryHelper.countRows(
datasource,
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
numWritten
)
);
}
String queryResponseTemplate = StringUtils.replace(
getResourceAsString(UNION_QUERIES_RESOURCE),
"%%DATASOURCE%%",
baseName
);
queryHelper.testQueriesFromString(queryResponseTemplate);
for (int i = 0; i < numDatasources; i++) {
indexer.terminateSupervisor(supervisors.get(i));
streamAdminClient.deleteStream(baseName + "-" + i);
}
for (int i = 0; i < numDatasources; i++) {
final int datasourceNumber = i;
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(baseName + "-" + datasourceNumber),
true,
10000,
10,
"Kafka segments loaded"
);
}
queryHelper.testQueriesFromString(queryResponseTemplate);
for (int i = 0; i < numDatasources; i++) {
final String datasource = baseName + "-" + i;
List<String> intervals = coordinator.getSegmentIntervals(datasource);
Collections.sort(intervals);
String first = intervals.get(0).split("/")[0];
String last = intervals.get(intervals.size() - 1).split("/")[1];
Interval interval = Intervals.of(first + "/" + last);
coordinator.unloadSegmentsForDataSource(baseName + "-" + i);
ITRetryUtil.retryUntilFalse(
() -> coordinator.areSegmentsLoaded(datasource),
"Segment Unloading"
);
coordinator.deleteSegmentsDataSource(baseName + "-" + i, interval);
}
}
/**
* sad version of
* {@link org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest#generateStreamIngestionPropsTransform}
*/
private Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
String fullDatasourceName,
IntegrationTestingConfig config
)
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerConfigs);
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
return spec -> {
try {
spec = StringUtils.replace(
spec,
"%%DATASOURCE%%",
fullDatasourceName
);
spec = StringUtils.replace(
spec,
"%%TOPIC_VALUE%%",
streamName
);
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
jsonMapper.writeValueAsString(consumerProperties)
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.query;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.KafkaAdminClient;
import org.apache.druid.testing.utils.KafkaEventWriter;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
import org.joda.time.Interval;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
public class UnionQueryTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(UnionQueryTest.class);
private static final String UNION_SUPERVISOR_TEMPLATE = "/query/union_kafka_supervisor_template.json";
private static final String UNION_DATA_FILE = "/query/union_data.json";
private static final String UNION_QUERIES_RESOURCE = "/query/union_queries.json";
private static final String UNION_DATASOURCE = "wikipedia_index_test";
private String fullDatasourceName;
@Test
public void testUnionQuery() throws Exception
{
fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
final String baseName = fullDatasourceName + UUID.randomUUID();
KafkaAdminClient streamAdminClient = new KafkaAdminClient(config);
List<String> supervisors = new ArrayList<>();
final int numDatasources = 3;
for (int i = 0; i < numDatasources; i++) {
String datasource = baseName + "-" + i;
streamAdminClient.createStream(datasource, 1, Collections.emptyMap());
ITRetryUtil.retryUntil(
() -> streamAdminClient.isStreamActive(datasource),
true,
10000,
30,
"Wait for stream active"
);
String supervisorSpec = generateStreamIngestionPropsTransform(
datasource,
datasource,
config
).apply(getResourceAsString(UNION_SUPERVISOR_TEMPLATE));
LOG.info("supervisorSpec: [%s]\n", supervisorSpec);
// Start supervisor
String specResponse = indexer.submitSupervisor(supervisorSpec);
LOG.info("Submitted supervisor [%s]", specResponse);
supervisors.add(specResponse);
int ctr = 0;
try (
StreamEventWriter streamEventWriter = new KafkaEventWriter(config, false);
BufferedReader reader = new BufferedReader(
new InputStreamReader(getResourceAsStream(UNION_DATA_FILE), StandardCharsets.UTF_8)
)
) {
String line;
while ((line = reader.readLine()) != null) {
streamEventWriter.write(datasource, StringUtils.toUtf8(line));
ctr++;
}
}
final int numWritten = ctr;
LOG.info("Waiting for stream indexing tasks to consume events");
ITRetryUtil.retryUntilTrue(
() ->
numWritten == this.queryHelper.countRows(
datasource,
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
StringUtils.format(
"dataSource[%s] consumed [%,d] events, expected [%,d]",
datasource,
this.queryHelper.countRows(
datasource,
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
numWritten
)
);
}
String queryResponseTemplate = StringUtils.replace(
getResourceAsString(UNION_QUERIES_RESOURCE),
"%%DATASOURCE%%",
baseName
);
queryHelper.testQueriesFromString(queryResponseTemplate);
for (int i = 0; i < numDatasources; i++) {
indexer.terminateSupervisor(supervisors.get(i));
streamAdminClient.deleteStream(baseName + "-" + i);
}
for (int i = 0; i < numDatasources; i++) {
final int datasourceNumber = i;
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(baseName + "-" + datasourceNumber),
true,
10000,
10,
"Kafka segments loaded"
);
}
queryHelper.testQueriesFromString(queryResponseTemplate);
for (int i = 0; i < numDatasources; i++) {
final String datasource = baseName + "-" + i;
List<String> intervals = coordinator.getSegmentIntervals(datasource);
Collections.sort(intervals);
String first = intervals.get(0).split("/")[0];
String last = intervals.get(intervals.size() - 1).split("/")[1];
Interval interval = Intervals.of(first + "/" + last);
coordinator.unloadSegmentsForDataSource(baseName + "-" + i);
ITRetryUtil.retryUntilFalse(
() -> coordinator.areSegmentsLoaded(datasource),
"Segment Unloading"
);
coordinator.deleteSegmentsDataSource(baseName + "-" + i, interval);
}
}
/**
* sad version of
* {@link org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest#generateStreamIngestionPropsTransform}
*/
private Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
String fullDatasourceName,
IntegrationTestingConfig config
)
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerConfigs);
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
return spec -> {
try {
spec = StringUtils.replace(
spec,
"%%DATASOURCE%%",
fullDatasourceName
);
spec = StringUtils.replace(
spec,
"%%TOPIC_VALUE%%",
streamName
);
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
jsonMapper.writeValueAsString(consumerProperties)
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
}
}

View File

@ -0,0 +1,199 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#-------------------------------------------------------------------------
# Definition of the batch index test cluster.
# See https://yaml.org/spec/1.2.2 for more about YAML
include:
- /cluster/Common/zk-metastore.yaml
- /cluster/Common/kafka.yaml
druid:
coordinator:
instances:
- port: 8081
overlord:
instances:
- port: 8090
broker:
instances:
- port: 8082
router:
instances:
- port: 8888
historical:
instances:
- port: 8083
indexer:
instances:
- port: 8091
# Metastore initialization queries.
# REPLACE is used so that the statements are idempotent
# The fancy formatting is for human consumption, it is compacted internally
metastoreInit:
- sql: |
REPLACE INTO druid_segments (
id, dataSource, created_date, start, end, partitioned, version, used, payload
)
VALUES (
'twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9',
'twitterstream',
'2013-05-13T01:08:18.192Z',
'2013-01-01T00:00:00.000Z',
'2013-01-02T00:00:00.000Z',
0,
'2013-01-02T04:13:41.980Z_v9',
1,
'{"dataSource": "twitterstream",
"interval": "2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z",
"version": "2013-01-02T04:13:41.980Z_v9",
"loadSpec": {
"type": "s3_zip",
"bucket": "static.druid.io",
"key": "data/segments/twitterstream/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/2013-01-02T04:13:41.980Z_v9/0/index.zip"
},
"dimensions": "has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,
rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name",
"metrics": "count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets",
"shardSpec": {"type": "none"},
"binaryVersion": 9,
"size": 445235220,
"identifier": "twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9"
}'
)
- sql: |
REPLACE INTO druid_segments (
id, dataSource, created_date, start, end, partitioned, version, used, payload
)
VALUES (
'twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9',
'twitterstream',
'2013-05-13T00:03:28.640Z',
'2013-01-02T00:00:00.000Z',
'2013-01-03T00:00:00.000Z',
0,
'2013-01-03T03:44:58.791Z_v9',
1,
'{"dataSource": "twitterstream",
"interval": "2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z",
"version": "2013-01-03T03:44:58.791Z_v9",
"loadSpec": {
"type": "s3_zip",
"bucket": "static.druid.io",
"key": "data/segments/twitterstream/2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z/2013-01-03T03:44:58.791Z_v9/0/index.zip"
},
"dimensions": "has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,
rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name",
"metrics": "count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets",
"shardSpec": {"type": "none"},
"binaryVersion": 9,
"size": 435325540,
"identifier": "twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9"
}'
)
- sql: |
REPLACE INTO druid_segments (
id, dataSource, created_date, start, end, partitioned, version, used, payload
)
VALUES (
'twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9',
'twitterstream',
'2013-05-13T00:03:48.807Z',
'2013-01-03T00:00:00.000Z',
'2013-01-04T00:00:00.000Z',
0,
'2013-01-04T04:09:13.590Z_v9',
1,
'{"dataSource": "twitterstream",
"interval": "2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z",
"version": "2013-01-04T04:09:13.590Z_v9",
"loadSpec": {
"type": "s3_zip",
"bucket": "static.druid.io",
"key": "data/segments/twitterstream/2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z/2013-01-04T04:09:13.590Z_v9/0/index.zip"
},
"dimensions": "has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,
rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name",
"metrics": "count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets",
"shardSpec": {"type": "none"},
"binaryVersion": 9,
"size": 411651320,
"identifier": "twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9"
}'
)
- sql: |
REPLACE INTO druid_segments (
id, dataSource, created_date, start, end, partitioned, version, used, payload
)
VALUES (
'wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9',
'wikipedia_editstream',
'2013-03-15T20:49:52.348Z',
'2012-12-29T00:00:00.000Z',
'2013-01-10T08:00:00.000Z',
0,
'2013-01-10T08:13:47.830Z_v9',
1,
'{"dataSource": "wikipedia_editstream",
"interval": "2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z",
"version": "2013-01-10T08:13:47.830Z_v9",
"loadSpec": {
"type": "s3_zip",
"bucket": "static.druid.io",
"key": "data/segments/wikipedia_editstream/2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z/2013-01-10T08:13:47.830Z_v9/0/index.zip"},
"dimensions": "anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,
namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user",
"metrics": "added,count,deleted,delta,delta_hist,unique_users,variation",
"shardSpec": {"type": "none"},
"binaryVersion": 9,
"size": 446027801,
"identifier": "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9"
}'
)
- sql: |
REPLACE INTO druid_segments (
id, dataSource, created_date, start, end, partitioned, version, used, payload
)
VALUES (
'wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z',
'wikipedia',
'2013-08-08T21:26:23.799Z',
'2013-08-01T00:00:00.000Z',
'2013-08-02T00:00:00.000Z',
0,
'2013-08-08T21:22:48.989Z',
1,
'{"dataSource": "wikipedia",
"interval": "2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z",
"version": "2013-08-08T21:22:48.989Z",
"loadSpec": {
"type": "s3_zip",
"bucket": "static.druid.io",
"key": "data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip"
},
"dimensions": "dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,
anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup",
"metrics": "count,delta,variation,added,deleted",
"shardSpec": {"type": "none"},
"binaryVersion": 9,
"size": 24664730,
"identifier": "wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z"
}'
)

View File

@ -0,0 +1,43 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
#### Problem
The idea of this test group is to simulate issues that can arise during rolling upgrade/downgrade.
#### Implementation
In this test group, the docker compose cluster is launched with services on different versions.
The docker image for the previous version is built by downloading the previous version druid tar.
Currently, the case where Coordinator and Overlord is on the previous version is tested.
#### Test coverage
Existing
- MSQ ingestion
- Native ingestion
- Stream ingestion
- Querying
Pending
- Compaction
#### Limitations
* `druid-testing-tools` jar is not published. The image for the previous version still uses the
extension from the current build.
This could break in case of incompatible changes in this extension.
In such a scenario the test should be disabled. However, this extension is primarily used to
test specific error scenarios and launch custom node role service (used in HighAvailability test group).

View File

@ -53,4 +53,23 @@ docker build -t $DRUID_IT_IMAGE_NAME \
--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
--build-arg HADOOP_VERSION=$HADOOP_VERSION \
--build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
--build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
.
if [[ -z "${BACKWARD_COMPATIBILITY_IT_ENABLED:-""}" || $BACKWARD_COMPATIBILITY_IT_ENABLED != "true" ]]; then
echo "Not building previous version image."
exit 0
fi
# Download the previous druid tar
curl -L $DRUID_PREVIOUS_VERSION_DOWNLOAD_URL --output apache-druid-$DRUID_PREVIOUS_VERSION-bin.tar.gz
docker build -t $DRUID_PREVIOUS_IT_IMAGE_NAME \
--build-arg DRUID_VERSION=$DRUID_PREVIOUS_VERSION \
--build-arg MYSQL_VERSION=$MYSQL_VERSION \
--build-arg MARIADB_VERSION=$MARIADB_VERSION \
--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
--build-arg HADOOP_VERSION=$HADOOP_VERSION \
--build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
--build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
.

View File

@ -46,13 +46,15 @@ ARG MARIADB_VERSION
ENV MARIADB_VERSION=$MARIADB_VERSION
ARG MYSQL_DRIVER_CLASSNAME=com.mysql.jdbc.Driver
ENV MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME
ARG DRUID_TESTING_TOOLS_VERSION
ENV DRUID_HOME=/usr/local/druid
# Populate build artifacts
COPY apache-druid-${DRUID_VERSION}-bin.tar.gz /usr/local/
COPY druid-it-tools-${DRUID_VERSION}.jar /tmp/druid/extensions/druid-it-tools/
COPY druid-it-tools-${DRUID_TESTING_TOOLS_VERSION}.jar /tmp/druid/extensions/druid-it-tools/
COPY kafka-protobuf-provider-${CONFLUENT_VERSION}.jar /tmp/druid/lib/
COPY mysql-connector-j-${MYSQL_VERSION}.jar /tmp/druid/lib/
COPY mariadb-java-client-${MARIADB_VERSION}.jar /tmp/druid/lib/
@ -60,6 +62,7 @@ COPY test-setup.sh /
COPY druid.sh /
COPY launch.sh /
# Do the setup tasks. The tasks are done within a script, rather than
# here, so they are easier to describe and debug. Turn on the "-x" flag
# within the script to trace the steps if needed for debugging.