From 99313e9996823624dfb7460c4619c86ca544bde7 Mon Sep 17 00:00:00 2001
From: Rishabh Singh <6513075+findingrish@users.noreply.github.com>
Date: Wed, 7 Aug 2024 11:13:35 +0530
Subject: [PATCH] Revised IT to detect backward incompatible change (#16779)
Added a new revised IT group BackwardCompatibilityMain. The idea is to catch potential backward compatibility issues that may arise during rolling upgrade.
This test group runs a docker-compose cluster with Overlord & Coordinator service on the previous druid version.
Following env vars are required in the GHA file .github/workflows/unit-and-integration-tests-unified.yml to run this test
DRUID_PREVIOUS_VERSION -> Previous druid version to test backward incompatibility.
DRUID_PREVIOUS_VERSION_DOWNLOAD_URL -> URL to fetch the tar.
---
.github/workflows/reusable-revised-its.yml | 49 ++-
.github/workflows/revised-its.yml | 39 ++
.../unit-and-integration-tests-unified.yml | 50 ++-
.../docker-compose.yaml | 111 +++++
integration-tests-ex/cases/pom.xml | 9 +
.../ITBCMainIndexerTest.java | 32 ++
.../ITBCMainMultiStageQuery.java | 32 ++
.../ITBCMainUnionQueryTest.java | 32 ++
.../categories/BackwardCompatibilityMain.java | 24 ++
.../druid/testsEx/indexer/ITIndexerTest.java | 358 +---------------
.../druid/testsEx/indexer/IndexerTest.java | 381 ++++++++++++++++++
.../druid/testsEx/msq/ITMultiStageQuery.java | 244 +----------
.../druid/testsEx/msq/MultiStageQuery.java | 266 ++++++++++++
.../druid/testsEx/query/ITUnionQueryTest.java | 184 +--------
.../druid/testsEx/query/UnionQueryTest.java | 207 ++++++++++
.../BackwardCompatibilityMain/docker.yaml | 199 +++++++++
.../docs/backward-compatibility-tests.md | 43 ++
integration-tests-ex/image/docker-build.sh | 19 +
integration-tests-ex/image/docker/Dockerfile | 5 +-
19 files changed, 1498 insertions(+), 786 deletions(-)
create mode 100644 integration-tests-ex/cases/cluster/BackwardCompatibilityMain/docker-compose.yaml
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainIndexerTest.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainMultiStageQuery.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainUnionQueryTest.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/BackwardCompatibilityMain.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/IndexerTest.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
create mode 100644 integration-tests-ex/cases/src/test/resources/cluster/BackwardCompatibilityMain/docker.yaml
create mode 100644 integration-tests-ex/docs/backward-compatibility-tests.md
diff --git a/.github/workflows/reusable-revised-its.yml b/.github/workflows/reusable-revised-its.yml
index d9237a52aba..1aa29f64cb5 100644
--- a/.github/workflows/reusable-revised-its.yml
+++ b/.github/workflows/reusable-revised-its.yml
@@ -57,6 +57,19 @@ on:
AWS_SECRET_ACCESS_KEY:
required: false
type: string
+ BACKWARD_COMPATIBILITY_IT_ENABLED:
+ required: false
+ type: string
+ default: false
+ DRUID_PREVIOUS_VERSION:
+ required: false
+ type: string
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
+ required: false
+ type: string
+ DRUID_PREVIOUS_IT_IMAGE_NAME:
+ required: false
+ type: string
env:
MYSQL_DRIVER_CLASSNAME: ${{ inputs.mysql_driver }} # Used by tests to connect to metadata store directly.
@@ -106,6 +119,15 @@ jobs:
./druid-container-jdk${{ inputs.build_jdk }}.tar.gz
./integration-tests-ex/image/target/env.sh
+ - name: Retrieve previous version cached docker image
+ id: docker-restore-previous-version
+ if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
+ uses: actions/cache/restore@v4
+ with:
+ key: druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
+ path: |
+ ./druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
+
- name: Maven build
if: steps.maven-restore.outputs.cache-hit != 'true' || ( steps.docker-restore.outputs.cache-hit != 'true' && steps.targets-restore.outputs.cache-hit != 'true' )
run: |
@@ -115,6 +137,10 @@ jobs:
if: steps.docker-restore.outputs.cache-hit != 'true' || steps.maven-restore.outputs.cache-hit != 'true'
env:
docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
+ BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
+ DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
+ DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
./it.sh image
source ./integration-tests-ex/image/target/env.sh
@@ -122,6 +148,15 @@ jobs:
echo $DRUID_IT_IMAGE_NAME
docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{ inputs.build_jdk }}.tar.gz
+ - name: Save previous version docker image
+ if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' && (steps.docker-restore.outputs.cache-hit != 'true' || steps.maven-restore.outputs.cache-hit != 'true') }}
+ env:
+ docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
+ run: |
+ docker tag ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}
+ echo ${DRUID_PREVIOUS_IT_IMAGE_NAME}
+ docker save "${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip > druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
+
- name: Stop and remove docker containers
run: |
echo "Force stopping all containers and pruning"
@@ -133,9 +168,21 @@ jobs:
docker load --input druid-container-jdk${{ inputs.build_jdk }}.tar.gz
docker images
+ - name: Load previous version docker image
+ if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
+ run: |
+ docker load --input druid-container-jdk${{ inputs.build_jdk }}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
+ docker images
+
- name: Run IT
id: run-it
- run: ${{ inputs.script }}
+ env:
+ BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
+ DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
+ DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
+ run: |
+ ${{ inputs.script }}
- name: Collect docker logs on failure
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
diff --git a/.github/workflows/revised-its.yml b/.github/workflows/revised-its.yml
index 069562bf7bd..412a3e46973 100644
--- a/.github/workflows/revised-its.yml
+++ b/.github/workflows/revised-its.yml
@@ -18,6 +18,24 @@
name: "Revised ITs workflow"
on:
workflow_call:
+ inputs:
+ BACKWARD_COMPATIBILITY_IT_ENABLED:
+ description: "Flag for backward compatibility IT"
+ required: false
+ default: false
+ type: string
+ DRUID_PREVIOUS_VERSION:
+ description: "Previous druid versions to run the test against."
+ required: false
+ type: string
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
+ description: "URL to download the previous druid version."
+ required: false
+ type: string
+ DRUID_PREVIOUS_IT_IMAGE_NAME:
+ description: "Druid previous version image name."
+ required: false
+ type: string
workflow_dispatch:
jobs:
@@ -79,3 +97,24 @@ jobs:
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: miniopassword
+
+ backward-compatibility-it:
+ needs: changes
+ uses: ./.github/workflows/reusable-revised-its.yml
+ if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' && (needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true') }}
+ with:
+ build_jdk: 8
+ runtime_jdk: 8
+ use_indexer: middleManager
+ script: ./it.sh github BackwardCompatibilityMain
+ it: BackwardCompatibilityMain
+ mysql_driver: com.mysql.jdbc.Driver
+ BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
+ DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
+ DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
+ DRUID_CLOUD_BUCKET: druid-qa
+ DRUID_CLOUD_PATH: aws-${{ github.run_id }}-${{ github.run_attempt }}
+ AWS_REGION: us-east-1
+ AWS_ACCESS_KEY_ID: admin
+ AWS_SECRET_ACCESS_KEY: miniopassword
diff --git a/.github/workflows/unit-and-integration-tests-unified.yml b/.github/workflows/unit-and-integration-tests-unified.yml
index 9651a56b8cb..544787e79a3 100644
--- a/.github/workflows/unit-and-integration-tests-unified.yml
+++ b/.github/workflows/unit-and-integration-tests-unified.yml
@@ -49,7 +49,30 @@ env:
SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5
jobs:
+ set-env-var:
+ name: Set env var
+ runs-on: ubuntu-latest
+ outputs:
+ DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ steps.image_name.outputs.image_name }}
+ BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ steps.it_enabled.outputs.enabled }}
+ DRUID_PREVIOUS_VERSION: ${{ env.DRUID_PREVIOUS_VERSION }}
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ env.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
+ steps:
+ - name: Set image name env var
+ id: image_name
+ run: |
+ echo "::set-output name=image_name::org.apache.druid.integration-tests/test:${{ env.DRUID_PREVIOUS_VERSION }}"
+ - name: Set env for enabling backward compatibility it
+ id: it_enabled
+ run: |
+ if [ -n "${{ env.DRUID_PREVIOUS_VERSION }}" ]; then
+ echo "::set-output name=enabled::true"
+ else
+ echo "::set-output name=enabled::false"
+ fi
+
build:
+ needs: set-env-var
name: "build (jdk${{ matrix.jdk }})"
strategy:
fail-fast: false
@@ -94,12 +117,25 @@ jobs:
./druid-container-jdk${{ matrix.jdk }}.tar.gz
./integration-tests-ex/image/target/env.sh
+ - name: Cache previous version image
+ id: docker_container_previous_version
+ uses: actions/cache@v4
+ with:
+ key: druid-container-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
+ path: |
+ ./druid-container-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
+
- name: Maven build
id: maven_build
run: |
./it.sh ci
- name: Container build
+ env:
+ BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
+ DRUID_PREVIOUS_VERSION: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
+ DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
./it.sh image
source ./integration-tests-ex/image/target/env.sh
@@ -111,6 +147,13 @@ jobs:
echo $DRUID_IT_IMAGE_NAME
docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{ matrix.jdk }}.tar.gz
+ - name: Save previous version docker image
+ if: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
+ run: |
+ docker tag ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
+ echo ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
+ docker save "${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip > druid-container-jdk${{ matrix.jdk }}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
+
unit-tests-phase2:
strategy:
fail-fast: false
@@ -142,6 +185,11 @@ jobs:
uses: ./.github/workflows/standard-its.yml
revised-its:
- needs: unit-tests
+ needs: [unit-tests, set-env-var]
if: ${{ always() && (needs.unit-tests.result == 'success' || needs.unit-tests.outputs.continue_tests) }}
uses: ./.github/workflows/revised-its.yml
+ with:
+ BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
+ DRUID_PREVIOUS_VERSION: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
+ DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
+ DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
diff --git a/integration-tests-ex/cases/cluster/BackwardCompatibilityMain/docker-compose.yaml b/integration-tests-ex/cases/cluster/BackwardCompatibilityMain/docker-compose.yaml
new file mode 100644
index 00000000000..4fbf0f71197
--- /dev/null
+++ b/integration-tests-ex/cases/cluster/BackwardCompatibilityMain/docker-compose.yaml
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+networks:
+ druid-it-net:
+ name: druid-it-net
+ ipam:
+ config:
+ - subnet: 172.172.172.0/24
+
+services:
+ zookeeper:
+ extends:
+ file: ../Common/dependencies.yaml
+ service: zookeeper
+
+ metadata:
+ extends:
+ file: ../Common/dependencies.yaml
+ service: metadata
+
+ coordinator:
+ extends:
+ file: ../Common/druid.yaml
+ service: coordinator
+ image: ${DRUID_PREVIOUS_IT_IMAGE_NAME}
+ container_name: coordinator
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ # The frequency with which the coordinator polls the database
+ # for changes. The DB population code has to wait at least this
+ # long for the coordinator to notice changes.
+ - druid_manager_segments_pollDuration=PT5S
+ - druid_coordinator_period=PT10S
+ depends_on:
+ - zookeeper
+ - metadata
+
+ overlord:
+ extends:
+ file: ../Common/druid.yaml
+ service: overlord
+ image: ${DRUID_PREVIOUS_IT_IMAGE_NAME}
+ container_name: overlord
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+ - metadata
+
+ broker:
+ extends:
+ file: ../Common/druid.yaml
+ service: broker
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+
+ router:
+ extends:
+ file: ../Common/druid.yaml
+ service: router
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+
+ historical:
+ extends:
+ file: ../Common/druid.yaml
+ service: historical
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+
+ middlemanager:
+ extends:
+ file: ../Common/druid.yaml
+ service: middlemanager
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ - druid_msq_intermediate_storage_enable=true
+ - druid_msq_intermediate_storage_type=local
+ - druid_msq_intermediate_storage_basePath=/shared/durablestorage/
+ - druid_export_storage_baseDir=/
+ volumes:
+ # Test data
+ - ../../resources:/resources
+ depends_on:
+ - zookeeper
+
+ kafka:
+ extends:
+ file: ../Common/dependencies.yaml
+ service: kafka
+ depends_on:
+ - zookeeper
diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml
index 40461dd8ef1..433608bbe24 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -459,6 +459,15 @@
GcsDeepStorage
+
+ IT-BackwardCompatibilityMain
+
+ false
+
+
+ BackwardCompatibilityMain
+
+
docker-tests
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainIndexerTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainIndexerTest.java
new file mode 100644
index 00000000000..01be3f3dbea
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainIndexerTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.BackwardCompatibilityMain;
+
+import org.apache.druid.testsEx.categories.BackwardCompatibilityMain;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.indexer.IndexerTest;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(BackwardCompatibilityMain.class)
+public class ITBCMainIndexerTest extends IndexerTest
+{
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainMultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainMultiStageQuery.java
new file mode 100644
index 00000000000..5c975f64f56
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainMultiStageQuery.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.BackwardCompatibilityMain;
+
+import org.apache.druid.testsEx.categories.BackwardCompatibilityMain;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.msq.MultiStageQuery;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(BackwardCompatibilityMain.class)
+public class ITBCMainMultiStageQuery extends MultiStageQuery
+{
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainUnionQueryTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainUnionQueryTest.java
new file mode 100644
index 00000000000..94979d86980
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/BackwardCompatibilityMain/ITBCMainUnionQueryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.BackwardCompatibilityMain;
+
+import org.apache.druid.testsEx.categories.BackwardCompatibilityMain;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.query.UnionQueryTest;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(BackwardCompatibilityMain.class)
+public class ITBCMainUnionQueryTest extends UnionQueryTest
+{
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/BackwardCompatibilityMain.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/BackwardCompatibilityMain.java
new file mode 100644
index 00000000000..f71b07243a1
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/BackwardCompatibilityMain.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.categories;
+
+public class BackwardCompatibilityMain
+{
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
index 84f2dff1d79..d173180db3b 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
@@ -19,369 +19,13 @@
package org.apache.druid.testsEx.indexer;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.metadata.LockFilterPolicy;
-import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testsEx.categories.BatchIndex;
import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.Closeable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
@RunWith(DruidTestRunner.class)
@Category(BatchIndex.class)
-public class ITIndexerTest extends AbstractITBatchIndexTest
+public class ITIndexerTest extends IndexerTest
{
- private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
- private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
- private static final String INDEX_DATASOURCE = "wikipedia_index_test";
-
- private static final String INDEX_WITH_TIMESTAMP_TASK = "/indexer/wikipedia_with_timestamp_index_task.json";
- // TODO: add queries that validate timestamp is different from the __time column since it is a dimension
- // TODO: https://github.com/apache/druid/issues/9565
- private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
- private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = "wikipedia_with_timestamp_index_test";
-
- private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
- private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task.json";
- private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
- private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
-
- private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json";
- private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
- private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test";
-
- private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json";
- private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json";
- private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
- private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test";
-
- private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json";
- private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test";
-
- private static final String GET_LOCKED_INTERVALS = "wikipedia_index_get_locked_intervals_test";
-
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
- CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
- CoordinatorDynamicConfig.builder().build();
-
- @Inject
- CoordinatorResourceTestClient coordinatorClient;
-
- @Test
- public void testIndexData() throws Exception
- {
- final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
- final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testIndexData-druidInputSource";
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
- final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
- ) {
-
- final Function transform = spec -> {
- try {
- return StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("0")
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- transform,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(false, false)
- );
- doReindexTest(
- INDEX_DATASOURCE,
- reindexDatasource,
- REINDEX_TASK,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- doReindexTest(
- INDEX_DATASOURCE,
- reindexDatasourceWithDruidInputSource,
- REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- }
- }
-
- @Test
- public void testReIndexDataWithTimestamp() throws Exception
- {
- final String reindexDatasource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp";
- final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp-druidInputSource";
- try (
- final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
- final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
- ) {
- doIndexTest(
- INDEX_WITH_TIMESTAMP_DATASOURCE,
- INDEX_WITH_TIMESTAMP_TASK,
- INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(false, false)
- );
- doReindexTest(
- INDEX_WITH_TIMESTAMP_DATASOURCE,
- reindexDatasource,
- REINDEX_TASK,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- doReindexTest(
- INDEX_WITH_TIMESTAMP_DATASOURCE,
- reindexDatasourceWithDruidInputSource,
- REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- }
- }
-
- @Test
- public void testReIndexWithNonExistingDatasource() throws Exception
- {
- Pair dummyPair = new Pair<>(false, false);
- final String fullBaseDatasourceName = "nonExistingDatasource2904";
- final String fullReindexDatasourceName = "newDatasource123";
-
- String taskSpec = StringUtils.replace(
- getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
- "%%DATASOURCE%%",
- fullBaseDatasourceName
- );
- taskSpec = StringUtils.replace(
- taskSpec,
- "%%REINDEX_DATASOURCE%%",
- fullReindexDatasourceName
- );
-
- // This method will also verify task is successful after task finish running
- // We expect task to be successful even if the datasource to reindex does not exist
- submitTaskAndWait(
- taskSpec,
- fullReindexDatasourceName,
- false,
- false,
- dummyPair
- );
- }
-
- @Test
- public void testMERGEIndexData() throws Exception
- {
- final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";
- final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource";
- try (
- final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
- final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
- ) {
- doIndexTest(
- MERGE_INDEX_DATASOURCE,
- MERGE_INDEX_TASK,
- MERGE_INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(false, false)
- );
- doReindexTest(
- MERGE_INDEX_DATASOURCE,
- reindexDatasource,
- MERGE_REINDEX_TASK,
- MERGE_REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- doReindexTest(
- MERGE_INDEX_DATASOURCE,
- reindexDatasourceWithDruidInputSource,
- MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
- MERGE_INDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- }
- }
-
- /**
- * Test that task reports indicate the ingested segments were loaded before the configured timeout expired.
- *
- * @throws Exception
- */
- @Test
- public void testIndexDataAwaitSegmentAvailability() throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- final Function transform = spec -> {
- try {
- return StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("600000")
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- transform,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(true, true)
- );
- }
- }
-
- /**
- * Test that the task still succeeds if the segments do not become available before the configured wait timeout
- * expires.
- *
- * @throws Exception
- */
- @Test
- public void testIndexDataAwaitSegmentAvailabilityFailsButTaskSucceeds() throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
- final Function transform = spec -> {
- try {
- return StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("1")
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- transform,
- INDEX_QUERIES_RESOURCE,
- false,
- false,
- false,
- new Pair<>(true, false)
- );
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
- );
- }
- }
-
-
- @Test
- public void testIndexWithMergeColumnLimitData() throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- doIndexTest(
- INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE,
- INDEX_WITH_MERGE_COLUMN_LIMIT_TASK,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(false, false)
- );
- }
- }
-
- @Test
- public void testGetLockedIntervals() throws Exception
- {
- final String datasourceName = GET_LOCKED_INTERVALS + config.getExtraDatasourceNameSuffix();
- try (final Closeable ignored = unloader(datasourceName)) {
- // Submit an Indexing Task
- submitIndexTask(INDEX_TASK, datasourceName);
-
- // Wait until it acquires a lock
- final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
- final Map> lockedIntervals = new HashMap<>();
- ITRetryUtil.retryUntilFalse(
- () -> {
- lockedIntervals.clear();
- lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
- return lockedIntervals.isEmpty();
- },
- "Verify Intervals are Locked"
- );
-
- // Verify the locked intervals for this datasource
- Assert.assertEquals(lockedIntervals.size(), 1);
- Assert.assertEquals(
- lockedIntervals.get(datasourceName),
- Collections.singletonList(Intervals.of("2013-08-31/2013-09-02"))
- );
-
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(datasourceName),
- "Segment Load"
- );
- }
- }
-
- @Test
- public void testJsonFunctions() throws Exception
- {
- final String taskSpec = getResourceAsString("/indexer/json_path_index_task.json");
-
- submitTaskAndWait(
- taskSpec,
- "json_path_index_test",
- false,
- true,
- new Pair<>(false, false)
- );
-
- doTestQuery("json_path_index_test", "/indexer/json_path_index_queries.json");
- }
}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/IndexerTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/IndexerTest.java
new file mode 100644
index 00000000000..e1e7109969c
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/IndexerTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class IndexerTest extends AbstractITBatchIndexTest
+{
+ private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
+ private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+ private static final String INDEX_DATASOURCE = "wikipedia_index_test";
+
+ private static final String INDEX_WITH_TIMESTAMP_TASK = "/indexer/wikipedia_with_timestamp_index_task.json";
+ // TODO: add queries that validate timestamp is different from the __time column since it is a dimension
+ // TODO: https://github.com/apache/druid/issues/9565
+ private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+ private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = "wikipedia_with_timestamp_index_test";
+
+ private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
+ private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task.json";
+ private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
+ private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
+
+ private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json";
+ private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
+ private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test";
+
+ private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json";
+ private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json";
+ private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
+ private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test";
+
+ private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json";
+ private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test";
+
+ private static final String GET_LOCKED_INTERVALS = "wikipedia_index_get_locked_intervals_test";
+
+ private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
+ CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
+ private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
+ CoordinatorDynamicConfig.builder().build();
+
+ @Inject
+ CoordinatorResourceTestClient coordinatorClient;
+
+ @Test
+ public void testIndexData() throws Exception
+ {
+ final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
+ final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testIndexData-druidInputSource";
+ try (
+ final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
+ ) {
+
+ final Function transform = spec -> {
+ try {
+ return StringUtils.replace(
+ spec,
+ "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
+ jsonMapper.writeValueAsString("0")
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doIndexTest(
+ INDEX_DATASOURCE,
+ INDEX_TASK,
+ transform,
+ INDEX_QUERIES_RESOURCE,
+ false,
+ true,
+ true,
+ new Pair<>(false, false)
+ );
+ doReindexTest(
+ INDEX_DATASOURCE,
+ reindexDatasource,
+ REINDEX_TASK,
+ REINDEX_QUERIES_RESOURCE,
+ new Pair<>(false, false)
+ );
+ doReindexTest(
+ INDEX_DATASOURCE,
+ reindexDatasourceWithDruidInputSource,
+ REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+ REINDEX_QUERIES_RESOURCE,
+ new Pair<>(false, false)
+ );
+ }
+ }
+
+ @Test
+ public void testReIndexDataWithTimestamp() throws Exception
+ {
+ final String reindexDatasource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp";
+ final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp-druidInputSource";
+ try (
+ final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
+ ) {
+ doIndexTest(
+ INDEX_WITH_TIMESTAMP_DATASOURCE,
+ INDEX_WITH_TIMESTAMP_TASK,
+ INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
+ false,
+ true,
+ true,
+ new Pair<>(false, false)
+ );
+ doReindexTest(
+ INDEX_WITH_TIMESTAMP_DATASOURCE,
+ reindexDatasource,
+ REINDEX_TASK,
+ REINDEX_QUERIES_RESOURCE,
+ new Pair<>(false, false)
+ );
+ doReindexTest(
+ INDEX_WITH_TIMESTAMP_DATASOURCE,
+ reindexDatasourceWithDruidInputSource,
+ REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+ REINDEX_QUERIES_RESOURCE,
+ new Pair<>(false, false)
+ );
+ }
+ }
+
+ @Test
+ public void testReIndexWithNonExistingDatasource() throws Exception
+ {
+ Pair dummyPair = new Pair<>(false, false);
+ final String fullBaseDatasourceName = "nonExistingDatasource2904";
+ final String fullReindexDatasourceName = "newDatasource123";
+
+ String taskSpec = StringUtils.replace(
+ getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
+ "%%DATASOURCE%%",
+ fullBaseDatasourceName
+ );
+ taskSpec = StringUtils.replace(
+ taskSpec,
+ "%%REINDEX_DATASOURCE%%",
+ fullReindexDatasourceName
+ );
+
+ // This method will also verify task is successful after task finish running
+ // We expect task to be successful even if the datasource to reindex does not exist
+ submitTaskAndWait(
+ taskSpec,
+ fullReindexDatasourceName,
+ false,
+ false,
+ dummyPair
+ );
+ }
+
+ @Test
+ public void testMERGEIndexData() throws Exception
+ {
+ final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";
+ final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource";
+ try (
+ final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
+ ) {
+ doIndexTest(
+ MERGE_INDEX_DATASOURCE,
+ MERGE_INDEX_TASK,
+ MERGE_INDEX_QUERIES_RESOURCE,
+ false,
+ true,
+ true,
+ new Pair<>(false, false)
+ );
+ doReindexTest(
+ MERGE_INDEX_DATASOURCE,
+ reindexDatasource,
+ MERGE_REINDEX_TASK,
+ MERGE_REINDEX_QUERIES_RESOURCE,
+ new Pair<>(false, false)
+ );
+ doReindexTest(
+ MERGE_INDEX_DATASOURCE,
+ reindexDatasourceWithDruidInputSource,
+ MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+ MERGE_INDEX_QUERIES_RESOURCE,
+ new Pair<>(false, false)
+ );
+ }
+ }
+
+ /**
+ * Test that task reports indicate the ingested segments were loaded before the configured timeout expired.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIndexDataAwaitSegmentAvailability() throws Exception
+ {
+ try (
+ final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ ) {
+ final Function transform = spec -> {
+ try {
+ return StringUtils.replace(
+ spec,
+ "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
+ jsonMapper.writeValueAsString("600000")
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doIndexTest(
+ INDEX_DATASOURCE,
+ INDEX_TASK,
+ transform,
+ INDEX_QUERIES_RESOURCE,
+ false,
+ true,
+ true,
+ new Pair<>(true, true)
+ );
+ }
+ }
+
+ /**
+ * Test that the task still succeeds if the segments do not become available before the configured wait timeout
+ * expires.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIndexDataAwaitSegmentAvailabilityFailsButTaskSucceeds() throws Exception
+ {
+ try (
+ final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ ) {
+ coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
+ final Function transform = spec -> {
+ try {
+ return StringUtils.replace(
+ spec,
+ "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
+ jsonMapper.writeValueAsString("1")
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doIndexTest(
+ INDEX_DATASOURCE,
+ INDEX_TASK,
+ transform,
+ INDEX_QUERIES_RESOURCE,
+ false,
+ false,
+ false,
+ new Pair<>(true, false)
+ );
+ coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
+ ITRetryUtil.retryUntilTrue(
+ () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
+ );
+ }
+ }
+
+
+ @Test
+ public void testIndexWithMergeColumnLimitData() throws Exception
+ {
+ try (
+ final Closeable ignored1 = unloader(INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ ) {
+ doIndexTest(
+ INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE,
+ INDEX_WITH_MERGE_COLUMN_LIMIT_TASK,
+ INDEX_QUERIES_RESOURCE,
+ false,
+ true,
+ true,
+ new Pair<>(false, false)
+ );
+ }
+ }
+
+ @Test
+ public void testGetLockedIntervals() throws Exception
+ {
+ final String datasourceName = GET_LOCKED_INTERVALS + config.getExtraDatasourceNameSuffix();
+ try (final Closeable ignored = unloader(datasourceName)) {
+ // Submit an Indexing Task
+ submitIndexTask(INDEX_TASK, datasourceName);
+
+ // Wait until it acquires a lock
+ final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
+ final Map> lockedIntervals = new HashMap<>();
+ ITRetryUtil.retryUntilFalse(
+ () -> {
+ lockedIntervals.clear();
+ lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
+ return lockedIntervals.isEmpty();
+ },
+ "Verify Intervals are Locked"
+ );
+
+ // Verify the locked intervals for this datasource
+ Assert.assertEquals(lockedIntervals.size(), 1);
+ Assert.assertEquals(
+ lockedIntervals.get(datasourceName),
+ Collections.singletonList(Intervals.of("2013-08-31/2013-09-02"))
+ );
+
+ ITRetryUtil.retryUntilTrue(
+ () -> coordinator.areSegmentsLoaded(datasourceName),
+ "Segment Load"
+ );
+ }
+ }
+
+ @Test
+ public void testJsonFunctions() throws Exception
+ {
+ final String taskSpec = getResourceAsString("/indexer/json_path_index_task.json");
+
+ submitTaskAndWait(
+ taskSpec,
+ "json_path_index_test",
+ false,
+ true,
+ new Pair<>(false, false)
+ );
+
+ doTestQuery("json_path_index_test", "/indexer/json_path_index_queries.json");
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
index 7ede24cd8f9..ce6462e225a 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -19,254 +19,12 @@
package org.apache.druid.testsEx.msq;
-import com.google.api.client.util.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import org.apache.druid.indexer.report.TaskContextReport;
-import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-import org.apache.druid.msq.sql.SqlTaskStatus;
-import org.apache.druid.storage.local.LocalFileExportStorageProvider;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.apache.druid.testsEx.categories.MultiStageQuery;
import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
@RunWith(DruidTestRunner.class)
@Category(MultiStageQuery.class)
-public class ITMultiStageQuery
+public class ITMultiStageQuery extends MultiStageQuery
{
- @Inject
- private MsqTestQueryHelper msqHelper;
-
- @Inject
- private DataLoaderHelper dataLoaderHelper;
-
- @Inject
- private CoordinatorResourceTestClient coordinatorClient;
-
- private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
-
- @Test
- public void testMsqIngestionAndQuerying() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n",
- datasource
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
- @Test
- @Ignore("localfiles() is disabled")
- public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " LOCALFILES(\n"
- + " files => ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
- + " format => 'json'\n"
- + " ))\n"
- + " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
- + " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
- + " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
- + " comment VARCHAR, page VARCHAR, commentLength BIGINT, countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
- + "PARTITIONED BY DAY\n",
- datasource
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
- @Test
- public void testExport() throws Exception
- {
- String exportQuery =
- StringUtils.format(
- "INSERT INTO extern(%s(exportPath => '%s'))\n"
- + "AS CSV\n"
- + "SELECT page, added, delta\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
- + " )\n"
- + ")\n",
- LocalFileExportStorageProvider.TYPE_NAME, "/shared/export/"
- );
-
- SqlTaskStatus exportTask = msqHelper.submitMsqTaskSuccesfully(exportQuery);
-
- msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
-
- if (exportTask.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- exportTask.getError()
- ));
- }
-
- String resultQuery = StringUtils.format(
- "SELECT page, delta, added\n"
- + " FROM TABLE(\n"
- + " EXTERN(\n"
- + " '{\"type\":\"local\",\"baseDir\":\"/shared/export/\",\"filter\":\"*.csv\"}',\n"
- + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
- + " )\n"
- + " ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
- + " WHERE delta != 0\n"
- + " ORDER BY page");
-
- SqlTaskStatus resultTaskStatus = msqHelper.submitMsqTaskSuccesfully(resultQuery);
-
- msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
-
- TaskReport.ReportMap statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
-
- MSQTaskReport taskReport = (MSQTaskReport) statusReport.get(MSQTaskReport.REPORT_KEY);
- if (taskReport == null) {
- throw new ISE("Unable to fetch the status report for the task [%]", resultTaskStatus.getTaskId());
- }
- TaskContextReport taskContextReport = (TaskContextReport) statusReport.get(TaskContextReport.REPORT_KEY);
- Assert.assertFalse(taskContextReport.getPayload().isEmpty());
-
- MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
- taskReport.getPayload(),
- "payload"
- );
- MSQResultsReport resultsReport = Preconditions.checkNotNull(
- taskReportPayload.getResults(),
- "Results report for the task id is empty"
- );
-
- List> actualResults = new ArrayList<>();
-
- for (final Object[] row : resultsReport.getResults()) {
- actualResults.add(Arrays.asList(row));
- }
-
- ImmutableList> expectedResults = ImmutableList.of(
- ImmutableList.of("Cherno Alpha", 111, 123),
- ImmutableList.of("Gypsy Danger", -143, 57),
- ImmutableList.of("Striker Eureka", 330, 459)
- );
-
- Assert.assertEquals(
- expectedResults,
- actualResults
- );
- }
}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
new file mode 100644
index 00000000000..bda1c243453
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.report.TaskContextReport;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.storage.local.LocalFileExportStorageProvider;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiStageQuery
+{
+ @Inject
+ private MsqTestQueryHelper msqHelper;
+
+ @Inject
+ private DataLoaderHelper dataLoaderHelper;
+
+ @Inject
+ private CoordinatorResourceTestClient coordinatorClient;
+
+ private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
+
+ @Test
+ public void testMsqIngestionAndQuerying() throws Exception
+ {
+ String datasource = "dst";
+
+ // Clear up the datasource from the previous runs
+ coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+ String queryLocal =
+ StringUtils.format(
+ "INSERT INTO %s\n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ + " )\n"
+ + ")\n"
+ + "PARTITIONED BY DAY\n",
+ datasource
+ );
+
+ // Submit the task and wait for the datasource to get loaded
+ SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+
+ msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+ msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+ }
+
+ @Test
+ @Ignore("localfiles() is disabled")
+ public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
+ {
+ String datasource = "dst";
+
+ // Clear up the datasource from the previous runs
+ coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+ String queryLocal =
+ StringUtils.format(
+ "INSERT INTO %s\n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " LOCALFILES(\n"
+ + " files => ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
+ + " format => 'json'\n"
+ + " ))\n"
+ + " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
+ + " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
+ + " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
+ + " comment VARCHAR, page VARCHAR, commentLength BIGINT, countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
+ + "PARTITIONED BY DAY\n",
+ datasource
+ );
+
+ // Submit the task and wait for the datasource to get loaded
+ SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryLocal);
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+
+ msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+ msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+ }
+
+ @Test
+ public void testExport() throws Exception
+ {
+ String exportQuery =
+ StringUtils.format(
+ "INSERT INTO extern(%s(exportPath => '%s'))\n"
+ + "AS CSV\n"
+ + "SELECT page, added, delta\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ + " )\n"
+ + ")\n",
+ LocalFileExportStorageProvider.TYPE_NAME, "/shared/export/"
+ );
+
+ SqlTaskStatus exportTask = msqHelper.submitMsqTaskSuccesfully(exportQuery);
+
+ msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
+
+ if (exportTask.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ exportTask.getError()
+ ));
+ }
+
+ String resultQuery = StringUtils.format(
+ "SELECT page, delta, added\n"
+ + " FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"baseDir\":\"/shared/export/\",\"filter\":\"*.csv\"}',\n"
+ + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+ + " )\n"
+ + " ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
+ + " WHERE delta != 0\n"
+ + " ORDER BY page");
+
+ SqlTaskStatus resultTaskStatus = msqHelper.submitMsqTaskSuccesfully(resultQuery);
+
+ msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
+
+ TaskReport.ReportMap statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
+
+ MSQTaskReport taskReport = (MSQTaskReport) statusReport.get(MSQTaskReport.REPORT_KEY);
+ if (taskReport == null) {
+ throw new ISE("Unable to fetch the status report for the task [%]", resultTaskStatus.getTaskId());
+ }
+ TaskContextReport taskContextReport = (TaskContextReport) statusReport.get(TaskContextReport.REPORT_KEY);
+ Assert.assertFalse(taskContextReport.getPayload().isEmpty());
+
+ MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+ taskReport.getPayload(),
+ "payload"
+ );
+ MSQResultsReport resultsReport = Preconditions.checkNotNull(
+ taskReportPayload.getResults(),
+ "Results report for the task id is empty"
+ );
+
+ List> actualResults = new ArrayList<>();
+
+ for (final Object[] row : resultsReport.getResults()) {
+ actualResults.add(Arrays.asList(row));
+ }
+
+ ImmutableList> expectedResults = ImmutableList.of(
+ ImmutableList.of("Cherno Alpha", 111, 123),
+ ImmutableList.of("Gypsy Danger", -143, 57),
+ ImmutableList.of("Striker Eureka", 330, 459)
+ );
+
+ Assert.assertEquals(
+ expectedResults,
+ actualResults
+ );
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
index 2d0d8e4f50e..03022da32fa 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
@@ -19,195 +19,13 @@
package org.apache.druid.testsEx.query;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.testing.utils.KafkaAdminClient;
-import org.apache.druid.testing.utils.KafkaEventWriter;
-import org.apache.druid.testing.utils.KafkaUtil;
-import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testsEx.categories.Query;
import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
-import org.joda.time.Interval;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.function.Function;
-
@RunWith(DruidTestRunner.class)
@Category(Query.class)
-public class ITUnionQueryTest extends AbstractIndexerTest
+public class ITUnionQueryTest extends UnionQueryTest
{
- private static final Logger LOG = new Logger(ITUnionQueryTest.class);
- private static final String UNION_SUPERVISOR_TEMPLATE = "/query/union_kafka_supervisor_template.json";
- private static final String UNION_DATA_FILE = "/query/union_data.json";
- private static final String UNION_QUERIES_RESOURCE = "/query/union_queries.json";
- private static final String UNION_DATASOURCE = "wikipedia_index_test";
- private String fullDatasourceName;
-
- @Test
- public void testUnionQuery() throws Exception
- {
- fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
- final String baseName = fullDatasourceName + UUID.randomUUID();
- KafkaAdminClient streamAdminClient = new KafkaAdminClient(config);
- List supervisors = new ArrayList<>();
-
- final int numDatasources = 3;
- for (int i = 0; i < numDatasources; i++) {
- String datasource = baseName + "-" + i;
- streamAdminClient.createStream(datasource, 1, Collections.emptyMap());
- ITRetryUtil.retryUntil(
- () -> streamAdminClient.isStreamActive(datasource),
- true,
- 10000,
- 30,
- "Wait for stream active"
- );
- String supervisorSpec = generateStreamIngestionPropsTransform(
- datasource,
- datasource,
- config
- ).apply(getResourceAsString(UNION_SUPERVISOR_TEMPLATE));
- LOG.info("supervisorSpec: [%s]\n", supervisorSpec);
- // Start supervisor
- String specResponse = indexer.submitSupervisor(supervisorSpec);
- LOG.info("Submitted supervisor [%s]", specResponse);
- supervisors.add(specResponse);
-
- int ctr = 0;
- try (
- StreamEventWriter streamEventWriter = new KafkaEventWriter(config, false);
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(getResourceAsStream(UNION_DATA_FILE), StandardCharsets.UTF_8)
- )
- ) {
- String line;
- while ((line = reader.readLine()) != null) {
- streamEventWriter.write(datasource, StringUtils.toUtf8(line));
- ctr++;
- }
- }
- final int numWritten = ctr;
-
- LOG.info("Waiting for stream indexing tasks to consume events");
-
- ITRetryUtil.retryUntilTrue(
- () ->
- numWritten == this.queryHelper.countRows(
- datasource,
- Intervals.ETERNITY,
- name -> new LongSumAggregatorFactory(name, "count")
- ),
- StringUtils.format(
- "dataSource[%s] consumed [%,d] events, expected [%,d]",
- datasource,
- this.queryHelper.countRows(
- datasource,
- Intervals.ETERNITY,
- name -> new LongSumAggregatorFactory(name, "count")
- ),
- numWritten
- )
- );
- }
-
- String queryResponseTemplate = StringUtils.replace(
- getResourceAsString(UNION_QUERIES_RESOURCE),
- "%%DATASOURCE%%",
- baseName
- );
-
- queryHelper.testQueriesFromString(queryResponseTemplate);
-
-
- for (int i = 0; i < numDatasources; i++) {
- indexer.terminateSupervisor(supervisors.get(i));
- streamAdminClient.deleteStream(baseName + "-" + i);
- }
-
- for (int i = 0; i < numDatasources; i++) {
- final int datasourceNumber = i;
- ITRetryUtil.retryUntil(
- () -> coordinator.areSegmentsLoaded(baseName + "-" + datasourceNumber),
- true,
- 10000,
- 10,
- "Kafka segments loaded"
- );
- }
-
- queryHelper.testQueriesFromString(queryResponseTemplate);
-
- for (int i = 0; i < numDatasources; i++) {
- final String datasource = baseName + "-" + i;
- List intervals = coordinator.getSegmentIntervals(datasource);
-
- Collections.sort(intervals);
- String first = intervals.get(0).split("/")[0];
- String last = intervals.get(intervals.size() - 1).split("/")[1];
- Interval interval = Intervals.of(first + "/" + last);
- coordinator.unloadSegmentsForDataSource(baseName + "-" + i);
- ITRetryUtil.retryUntilFalse(
- () -> coordinator.areSegmentsLoaded(datasource),
- "Segment Unloading"
- );
- coordinator.deleteSegmentsDataSource(baseName + "-" + i, interval);
- }
- }
-
-
- /**
- * sad version of
- * {@link org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest#generateStreamIngestionPropsTransform}
- */
- private Function generateStreamIngestionPropsTransform(
- String streamName,
- String fullDatasourceName,
- IntegrationTestingConfig config
- )
- {
- final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
- final Properties consumerProperties = new Properties();
- consumerProperties.putAll(consumerConfigs);
- consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
- KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
- return spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%DATASOURCE%%",
- fullDatasourceName
- );
- spec = StringUtils.replace(
- spec,
- "%%TOPIC_VALUE%%",
- streamName
- );
- return StringUtils.replace(
- spec,
- "%%STREAM_PROPERTIES_VALUE%%",
- jsonMapper.writeValueAsString(consumerProperties)
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- }
}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
new file mode 100644
index 00000000000..a1d62fe55f5
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.query;
+
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KafkaAdminClient;
+import org.apache.druid.testing.utils.KafkaEventWriter;
+import org.apache.druid.testing.utils.KafkaUtil;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Function;
+
+public class UnionQueryTest extends AbstractIndexerTest
+{
+ private static final Logger LOG = new Logger(UnionQueryTest.class);
+ private static final String UNION_SUPERVISOR_TEMPLATE = "/query/union_kafka_supervisor_template.json";
+ private static final String UNION_DATA_FILE = "/query/union_data.json";
+ private static final String UNION_QUERIES_RESOURCE = "/query/union_queries.json";
+ private static final String UNION_DATASOURCE = "wikipedia_index_test";
+ private String fullDatasourceName;
+
+ @Test
+ public void testUnionQuery() throws Exception
+ {
+ fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
+ final String baseName = fullDatasourceName + UUID.randomUUID();
+ KafkaAdminClient streamAdminClient = new KafkaAdminClient(config);
+ List supervisors = new ArrayList<>();
+
+ final int numDatasources = 3;
+ for (int i = 0; i < numDatasources; i++) {
+ String datasource = baseName + "-" + i;
+ streamAdminClient.createStream(datasource, 1, Collections.emptyMap());
+ ITRetryUtil.retryUntil(
+ () -> streamAdminClient.isStreamActive(datasource),
+ true,
+ 10000,
+ 30,
+ "Wait for stream active"
+ );
+ String supervisorSpec = generateStreamIngestionPropsTransform(
+ datasource,
+ datasource,
+ config
+ ).apply(getResourceAsString(UNION_SUPERVISOR_TEMPLATE));
+ LOG.info("supervisorSpec: [%s]\n", supervisorSpec);
+ // Start supervisor
+ String specResponse = indexer.submitSupervisor(supervisorSpec);
+ LOG.info("Submitted supervisor [%s]", specResponse);
+ supervisors.add(specResponse);
+
+ int ctr = 0;
+ try (
+ StreamEventWriter streamEventWriter = new KafkaEventWriter(config, false);
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(getResourceAsStream(UNION_DATA_FILE), StandardCharsets.UTF_8)
+ )
+ ) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ streamEventWriter.write(datasource, StringUtils.toUtf8(line));
+ ctr++;
+ }
+ }
+ final int numWritten = ctr;
+
+ LOG.info("Waiting for stream indexing tasks to consume events");
+
+ ITRetryUtil.retryUntilTrue(
+ () ->
+ numWritten == this.queryHelper.countRows(
+ datasource,
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ StringUtils.format(
+ "dataSource[%s] consumed [%,d] events, expected [%,d]",
+ datasource,
+ this.queryHelper.countRows(
+ datasource,
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ numWritten
+ )
+ );
+ }
+
+ String queryResponseTemplate = StringUtils.replace(
+ getResourceAsString(UNION_QUERIES_RESOURCE),
+ "%%DATASOURCE%%",
+ baseName
+ );
+
+ queryHelper.testQueriesFromString(queryResponseTemplate);
+
+
+ for (int i = 0; i < numDatasources; i++) {
+ indexer.terminateSupervisor(supervisors.get(i));
+ streamAdminClient.deleteStream(baseName + "-" + i);
+ }
+
+ for (int i = 0; i < numDatasources; i++) {
+ final int datasourceNumber = i;
+ ITRetryUtil.retryUntil(
+ () -> coordinator.areSegmentsLoaded(baseName + "-" + datasourceNumber),
+ true,
+ 10000,
+ 10,
+ "Kafka segments loaded"
+ );
+ }
+
+ queryHelper.testQueriesFromString(queryResponseTemplate);
+
+ for (int i = 0; i < numDatasources; i++) {
+ final String datasource = baseName + "-" + i;
+ List intervals = coordinator.getSegmentIntervals(datasource);
+
+ Collections.sort(intervals);
+ String first = intervals.get(0).split("/")[0];
+ String last = intervals.get(intervals.size() - 1).split("/")[1];
+ Interval interval = Intervals.of(first + "/" + last);
+ coordinator.unloadSegmentsForDataSource(baseName + "-" + i);
+ ITRetryUtil.retryUntilFalse(
+ () -> coordinator.areSegmentsLoaded(datasource),
+ "Segment Unloading"
+ );
+ coordinator.deleteSegmentsDataSource(baseName + "-" + i, interval);
+ }
+ }
+
+
+ /**
+ * sad version of
+ * {@link org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest#generateStreamIngestionPropsTransform}
+ */
+ private Function generateStreamIngestionPropsTransform(
+ String streamName,
+ String fullDatasourceName,
+ IntegrationTestingConfig config
+ )
+ {
+ final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
+ final Properties consumerProperties = new Properties();
+ consumerProperties.putAll(consumerConfigs);
+ consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
+ KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
+ return spec -> {
+ try {
+ spec = StringUtils.replace(
+ spec,
+ "%%DATASOURCE%%",
+ fullDatasourceName
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%TOPIC_VALUE%%",
+ streamName
+ );
+ return StringUtils.replace(
+ spec,
+ "%%STREAM_PROPERTIES_VALUE%%",
+ jsonMapper.writeValueAsString(consumerProperties)
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/resources/cluster/BackwardCompatibilityMain/docker.yaml b/integration-tests-ex/cases/src/test/resources/cluster/BackwardCompatibilityMain/docker.yaml
new file mode 100644
index 00000000000..1c51532c6a9
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/cluster/BackwardCompatibilityMain/docker.yaml
@@ -0,0 +1,199 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#-------------------------------------------------------------------------
+
+# Definition of the batch index test cluster.
+# See https://yaml.org/spec/1.2.2 for more about YAML
+include:
+ - /cluster/Common/zk-metastore.yaml
+ - /cluster/Common/kafka.yaml
+
+druid:
+ coordinator:
+ instances:
+ - port: 8081
+ overlord:
+ instances:
+ - port: 8090
+ broker:
+ instances:
+ - port: 8082
+ router:
+ instances:
+ - port: 8888
+ historical:
+ instances:
+ - port: 8083
+ indexer:
+ instances:
+ - port: 8091
+
+# Metastore initialization queries.
+# REPLACE is used so that the statements are idempotent
+# The fancy formatting is for human consumption, it is compacted internally
+metastoreInit:
+ - sql: |
+ REPLACE INTO druid_segments (
+ id, dataSource, created_date, start, end, partitioned, version, used, payload
+ )
+ VALUES (
+ 'twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9',
+ 'twitterstream',
+ '2013-05-13T01:08:18.192Z',
+ '2013-01-01T00:00:00.000Z',
+ '2013-01-02T00:00:00.000Z',
+ 0,
+ '2013-01-02T04:13:41.980Z_v9',
+ 1,
+ '{"dataSource": "twitterstream",
+ "interval": "2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z",
+ "version": "2013-01-02T04:13:41.980Z_v9",
+ "loadSpec": {
+ "type": "s3_zip",
+ "bucket": "static.druid.io",
+ "key": "data/segments/twitterstream/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/2013-01-02T04:13:41.980Z_v9/0/index.zip"
+ },
+ "dimensions": "has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,
+ rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name",
+ "metrics": "count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets",
+ "shardSpec": {"type": "none"},
+ "binaryVersion": 9,
+ "size": 445235220,
+ "identifier": "twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9"
+ }'
+ )
+
+ - sql: |
+ REPLACE INTO druid_segments (
+ id, dataSource, created_date, start, end, partitioned, version, used, payload
+ )
+ VALUES (
+ 'twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9',
+ 'twitterstream',
+ '2013-05-13T00:03:28.640Z',
+ '2013-01-02T00:00:00.000Z',
+ '2013-01-03T00:00:00.000Z',
+ 0,
+ '2013-01-03T03:44:58.791Z_v9',
+ 1,
+ '{"dataSource": "twitterstream",
+ "interval": "2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z",
+ "version": "2013-01-03T03:44:58.791Z_v9",
+ "loadSpec": {
+ "type": "s3_zip",
+ "bucket": "static.druid.io",
+ "key": "data/segments/twitterstream/2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z/2013-01-03T03:44:58.791Z_v9/0/index.zip"
+ },
+ "dimensions": "has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,
+ rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name",
+ "metrics": "count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets",
+ "shardSpec": {"type": "none"},
+ "binaryVersion": 9,
+ "size": 435325540,
+ "identifier": "twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9"
+ }'
+ )
+
+ - sql: |
+ REPLACE INTO druid_segments (
+ id, dataSource, created_date, start, end, partitioned, version, used, payload
+ )
+ VALUES (
+ 'twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9',
+ 'twitterstream',
+ '2013-05-13T00:03:48.807Z',
+ '2013-01-03T00:00:00.000Z',
+ '2013-01-04T00:00:00.000Z',
+ 0,
+ '2013-01-04T04:09:13.590Z_v9',
+ 1,
+ '{"dataSource": "twitterstream",
+ "interval": "2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z",
+ "version": "2013-01-04T04:09:13.590Z_v9",
+ "loadSpec": {
+ "type": "s3_zip",
+ "bucket": "static.druid.io",
+ "key": "data/segments/twitterstream/2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z/2013-01-04T04:09:13.590Z_v9/0/index.zip"
+ },
+ "dimensions": "has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,
+ rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name",
+ "metrics": "count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets",
+ "shardSpec": {"type": "none"},
+ "binaryVersion": 9,
+ "size": 411651320,
+ "identifier": "twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9"
+ }'
+ )
+
+ - sql: |
+ REPLACE INTO druid_segments (
+ id, dataSource, created_date, start, end, partitioned, version, used, payload
+ )
+ VALUES (
+ 'wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9',
+ 'wikipedia_editstream',
+ '2013-03-15T20:49:52.348Z',
+ '2012-12-29T00:00:00.000Z',
+ '2013-01-10T08:00:00.000Z',
+ 0,
+ '2013-01-10T08:13:47.830Z_v9',
+ 1,
+ '{"dataSource": "wikipedia_editstream",
+ "interval": "2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z",
+ "version": "2013-01-10T08:13:47.830Z_v9",
+ "loadSpec": {
+ "type": "s3_zip",
+ "bucket": "static.druid.io",
+ "key": "data/segments/wikipedia_editstream/2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z/2013-01-10T08:13:47.830Z_v9/0/index.zip"},
+ "dimensions": "anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,
+ namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user",
+ "metrics": "added,count,deleted,delta,delta_hist,unique_users,variation",
+ "shardSpec": {"type": "none"},
+ "binaryVersion": 9,
+ "size": 446027801,
+ "identifier": "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9"
+ }'
+ )
+
+ - sql: |
+ REPLACE INTO druid_segments (
+ id, dataSource, created_date, start, end, partitioned, version, used, payload
+ )
+ VALUES (
+ 'wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z',
+ 'wikipedia',
+ '2013-08-08T21:26:23.799Z',
+ '2013-08-01T00:00:00.000Z',
+ '2013-08-02T00:00:00.000Z',
+ 0,
+ '2013-08-08T21:22:48.989Z',
+ 1,
+ '{"dataSource": "wikipedia",
+ "interval": "2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z",
+ "version": "2013-08-08T21:22:48.989Z",
+ "loadSpec": {
+ "type": "s3_zip",
+ "bucket": "static.druid.io",
+ "key": "data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip"
+ },
+ "dimensions": "dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,
+ anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup",
+ "metrics": "count,delta,variation,added,deleted",
+ "shardSpec": {"type": "none"},
+ "binaryVersion": 9,
+ "size": 24664730,
+ "identifier": "wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z"
+ }'
+ )
diff --git a/integration-tests-ex/docs/backward-compatibility-tests.md b/integration-tests-ex/docs/backward-compatibility-tests.md
new file mode 100644
index 00000000000..a2d56217b85
--- /dev/null
+++ b/integration-tests-ex/docs/backward-compatibility-tests.md
@@ -0,0 +1,43 @@
+
+
+#### Problem
+The idea of this test group is to simulate issues that can arise during rolling upgrade/downgrade.
+
+#### Implementation
+In this test group, the docker compose cluster is launched with services on different versions.
+The docker image for the previous version is built by downloading the previous version druid tar.
+Currently, the case where Coordinator and Overlord is on the previous version is tested.
+
+#### Test coverage
+Existing
+- MSQ ingestion
+- Native ingestion
+- Stream ingestion
+- Querying
+
+Pending
+- Compaction
+
+#### Limitations
+* `druid-testing-tools` jar is not published. The image for the previous version still uses the
+extension from the current build.
+This could break in case of incompatible changes in this extension.
+In such a scenario the test should be disabled. However, this extension is primarily used to
+test specific error scenarios and launch custom node role service (used in HighAvailability test group).
diff --git a/integration-tests-ex/image/docker-build.sh b/integration-tests-ex/image/docker-build.sh
index 6a945aa6129..4ff5d7a7438 100755
--- a/integration-tests-ex/image/docker-build.sh
+++ b/integration-tests-ex/image/docker-build.sh
@@ -53,4 +53,23 @@ docker build -t $DRUID_IT_IMAGE_NAME \
--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
--build-arg HADOOP_VERSION=$HADOOP_VERSION \
--build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
+ --build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
+ .
+
+if [[ -z "${BACKWARD_COMPATIBILITY_IT_ENABLED:-""}" || $BACKWARD_COMPATIBILITY_IT_ENABLED != "true" ]]; then
+ echo "Not building previous version image."
+ exit 0
+fi
+
+# Download the previous druid tar
+curl -L $DRUID_PREVIOUS_VERSION_DOWNLOAD_URL --output apache-druid-$DRUID_PREVIOUS_VERSION-bin.tar.gz
+
+docker build -t $DRUID_PREVIOUS_IT_IMAGE_NAME \
+ --build-arg DRUID_VERSION=$DRUID_PREVIOUS_VERSION \
+ --build-arg MYSQL_VERSION=$MYSQL_VERSION \
+ --build-arg MARIADB_VERSION=$MARIADB_VERSION \
+ --build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
+ --build-arg HADOOP_VERSION=$HADOOP_VERSION \
+ --build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
+ --build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
.
diff --git a/integration-tests-ex/image/docker/Dockerfile b/integration-tests-ex/image/docker/Dockerfile
index a77a5c2d023..90955eae3c1 100644
--- a/integration-tests-ex/image/docker/Dockerfile
+++ b/integration-tests-ex/image/docker/Dockerfile
@@ -46,13 +46,15 @@ ARG MARIADB_VERSION
ENV MARIADB_VERSION=$MARIADB_VERSION
ARG MYSQL_DRIVER_CLASSNAME=com.mysql.jdbc.Driver
ENV MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME
+ARG DRUID_TESTING_TOOLS_VERSION
ENV DRUID_HOME=/usr/local/druid
# Populate build artifacts
COPY apache-druid-${DRUID_VERSION}-bin.tar.gz /usr/local/
-COPY druid-it-tools-${DRUID_VERSION}.jar /tmp/druid/extensions/druid-it-tools/
+COPY druid-it-tools-${DRUID_TESTING_TOOLS_VERSION}.jar /tmp/druid/extensions/druid-it-tools/
+
COPY kafka-protobuf-provider-${CONFLUENT_VERSION}.jar /tmp/druid/lib/
COPY mysql-connector-j-${MYSQL_VERSION}.jar /tmp/druid/lib/
COPY mariadb-java-client-${MARIADB_VERSION}.jar /tmp/druid/lib/
@@ -60,6 +62,7 @@ COPY test-setup.sh /
COPY druid.sh /
COPY launch.sh /
+
# Do the setup tasks. The tasks are done within a script, rather than
# here, so they are easier to describe and debug. Turn on the "-x" flag
# within the script to trace the steps if needed for debugging.