diff --git a/.travis.yml b/.travis.yml index af0192d0eff..f3b42c48bf3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -393,11 +393,19 @@ jobs: script: *run_integration_test after_failure: *integration_test_diags + - &integration_append_ingestion + name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' + script: *run_integration_test + after_failure: *integration_test_diags + - &integration_tests name: "(Compile=openjdk8, Run=openjdk8) other integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' script: *run_integration_test after_failure: *integration_test_diags # END - Integration tests for Compile with Java 8 and Run with Java 8 @@ -433,10 +441,15 @@ jobs: jdk: openjdk8 env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11' + - <<: *integration_append_ingestion + name: "(Compile=openjdk8, Run=openjdk11) append ingestion integration test" + jdk: openjdk8 + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' + - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' # END - Integration tests for Compile with Java 8 and Run with Java 11 - name: "security vulnerabilities" diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index b369ac34b7a..c0e04b9ab21 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.query.lookup.LookupsState; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer; @@ -95,9 +94,9 @@ public class CoordinatorResourceTestClient return StringUtils.format("%sdatasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } - private String getLoadStatusURL() + private String getLoadStatusURL(String dataSource) { - return StringUtils.format("%s%s", getCoordinatorURL(), "loadstatus"); + return StringUtils.format("%sdatasources/%s/loadstatus?forceMetadataRefresh=true&interval=1970-01-01/2999-01-01", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } /** return a list of the segment dates for the specified data source */ @@ -173,11 +172,27 @@ public class CoordinatorResourceTestClient } } - private Map getLoadStatus() + private Map getLoadStatus(String dataSorce) { + String url = getLoadStatusURL(dataSorce); Map status; try { - StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL()); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), + responseHandler + ).get(); + + if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) { + return null; + } + if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) { + throw new ISE( + "Error while making request to url[%s] status[%s] content[%s]", + url, + response.getStatus(), + response.getContent() + ); + } status = jsonMapper.readValue( response.getContent(), new TypeReference>() @@ -191,18 +206,10 @@ public class CoordinatorResourceTestClient return status; } - /** - * Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which - * caches segments in memory and periodically updates them. Hence, there can be a race condition as - * this API implementation compares segments metadata from cache with segments in historicals. - * Particularly, when number of segment changes after the first initial load of the datasource. - * Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments} - * before calling this method (since, that would wait until the cache is updated with expected data) - */ public boolean areSegmentsLoaded(String dataSource) { - final Map status = getLoadStatus(); - return (status.containsKey(dataSource) && status.get(dataSource) == 100.0); + final Map status = getLoadStatus(dataSource); + return (status != null && status.containsKey(dataSource) && status.get(dataSource) == 100.0); } public void unloadSegmentsForDataSource(String dataSource) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java index 4c6518d535b..d3bbd1f0cfc 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java @@ -19,6 +19,7 @@ package org.apache.druid.testing.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.model.Container; import com.github.dockerjava.core.DockerClientBuilder; @@ -31,6 +32,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -51,15 +53,18 @@ public class DruidClusterAdminClient private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router"; private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager"; + private final ObjectMapper jsonMapper; private final HttpClient httpClient; private IntegrationTestingConfig config; @Inject DruidClusterAdminClient( + ObjectMapper jsonMapper, @TestClient HttpClient httpClient, IntegrationTestingConfig config ) { + this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.config = config; } @@ -97,6 +102,7 @@ public class DruidClusterAdminClient public void waitUntilCoordinatorReady() { waitUntilInstanceReady(config.getCoordinatorUrl()); + postDynamicConfig(CoordinatorDynamicConfig.builder().withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1).build()); } public void waitUntilHistoricalReady() @@ -159,4 +165,29 @@ public class DruidClusterAdminClient "Waiting for instance to be ready: [" + host + "]" ); } + + private void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig) + { + ITRetryUtil.retryUntilTrue( + () -> { + try { + String url = StringUtils.format("%s/druid/coordinator/v1/config", config.getCoordinatorUrl()); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(coordinatorDynamicConfig) + ), StatusResponseHandler.getInstance() + ).get(); + + LOG.info("%s %s", response.getStatus(), response.getContent()); + return response.getStatus().equals(HttpResponseStatus.OK); + } + catch (Throwable e) { + LOG.error(e, ""); + return false; + } + }, + "Posting dynamic config after startup" + ); + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index b76b035d01d..ead9f1337b3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -41,6 +41,8 @@ public class TestNGGroup public static final String OTHER_INDEX = "other-index"; + public static final String APPEND_INGESTION = "append-ingestion"; + public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index"; /** diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 3618d84a5a0..f123a09efcf 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -87,15 +87,15 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifyQuery(INDEX_QUERIES_RESOURCE); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1)); - //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total) - forceTriggerAutoCompaction(5); + //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total) + forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); - //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total) - forceTriggerAutoCompaction(6); + //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total) + forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); @@ -119,8 +119,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST); // Instead of merging segments, the updated config will split segments! - //...compacted into 10 new segments across 2 days. 5 new segments each day (14 total) - forceTriggerAutoCompaction(14); + //...compacted into 10 new segments across 2 days. 5 new segments each day (10 total) + forceTriggerAutoCompaction(10); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(10, 1); @@ -154,6 +154,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest @Test public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception { + // Set compactionTaskSlotRatio to 0 to prevent any compaction + updateCompactionTaskSlot(0, 0); loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -162,19 +164,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); - // Skips first day. Should only compact one out of two days. submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); - - // Set compactionTaskSlotRatio to 0 to prevent any compaction - updateCompactionTaskSlot(0, 100); - // ...should remains unchanged (4 total) - forceTriggerAutoCompaction(4); - verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(0, null); - checkCompactionIntervals(intervalsBeforeCompaction); - - // Set maxCompactionTaskSlots to 0 to prevent any compaction - updateCompactionTaskSlot(0.1, 0); // ...should remains unchanged (4 total) forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -183,15 +173,15 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Update compaction slots to be 1 updateCompactionTaskSlot(1, 1); - // One day compacted (1 new segment) and one day remains uncompacted. (5 total) - forceTriggerAutoCompaction(5); + // One day compacted (1 new segment) and one day remains uncompacted. (3 total) + forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312"); // Run compaction again to compact the remaining day - // Remaining day compacted (1 new segment). Now both days compacted (6 total) - forceTriggerAutoCompaction(6); + // Remaining day compacted (1 new segment). Now both days compacted (2 total) + forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); @@ -283,11 +273,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest { compactionResource.forceTriggerAutoCompaction(); waitForAllTasksToCompleteForDataSource(fullDatasourceName); - verifySegmentsCount(numExpectedSegmentsAfterCompaction); ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Compaction" ); + verifySegmentsCount(numExpectedSegmentsAfterCompaction); } private void verifySegmentsCount(int numExpectedSegments) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 993ee6bac34..9a36015aed1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -22,6 +22,7 @@ package org.apache.druid.tests.indexer; import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask; @@ -43,6 +44,7 @@ import org.testng.Assert; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -127,29 +129,33 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion, waitForSegmentsToLoad); if (runTestQueries) { + doTestQuery(dataSource, queryFilePath, 2); + } + } + + protected void doTestQuery(String dataSource, String queryFilePath, int timesToRun) + { + try { + String queryResponseTemplate; try { - - String queryResponseTemplate; - try { - InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); - queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", queryFilePath); - } - - queryResponseTemplate = StringUtils.replace( - queryResponseTemplate, - "%%DATASOURCE%%", - fullDatasourceName - ); - queryHelper.testQueriesFromString(queryResponseTemplate, 2); - + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); } - catch (Exception e) { - LOG.error(e, "Error while testing"); - throw new RuntimeException(e); + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + dataSource + config.getExtraDatasourceNameSuffix() + ); + queryHelper.testQueriesFromString(queryResponseTemplate, timesToRun); + + } + catch (Exception e) { + LOG.error(e, "Error while testing"); + throw new RuntimeException(e); } } @@ -321,4 +327,71 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest }) .count(); } + + void verifySegmentsCountAndLoaded(String dataSource, int numExpectedSegments) + { + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(dataSource + config.getExtraDatasourceNameSuffix()), + "Segment load check" + ); + ITRetryUtil.retryUntilTrue( + () -> { + int segmentCount = coordinator.getAvailableSegments( + dataSource + config.getExtraDatasourceNameSuffix() + ).size(); + LOG.info("Current segment count: %d, expected: %d", segmentCount, numExpectedSegments); + return segmentCount == numExpectedSegments; + }, + "Segment count check" + ); + } + + void compactData(String dataSource, String compactionTask) throws Exception + { + final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + final String template = getResourceAsString(compactionTask); + String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + + final String taskID = indexer.submitTask(taskSpec); + LOG.info("TaskID for compaction task %s", taskID); + indexer.waitUntilTaskCompletes(taskID); + + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); + ITRetryUtil.retryUntilTrue( + () -> { + final List actualIntervals = coordinator.getSegmentIntervals( + dataSource + config.getExtraDatasourceNameSuffix() + ); + actualIntervals.sort(null); + return actualIntervals.equals(intervalsBeforeCompaction); + }, + "Compaction interval check" + ); + } + + void verifySegmentsCompacted(String dataSource, int expectedCompactedSegmentCount) + { + List segments = coordinator.getFullSegmentsMetadata( + dataSource + config.getExtraDatasourceNameSuffix() + ); + List foundCompactedSegments = new ArrayList<>(); + for (DataSegment segment : segments) { + if (segment.getLastCompactionState() != null) { + foundCompactedSegments.add(segment); + } + } + Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount); + for (DataSegment compactedSegment : foundCompactedSegments) { + Assert.assertNotNull(compactedSegment.getLastCompactionState()); + Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); + Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(), + SecondaryPartitionType.LINEAR + ); + } + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java index a00bb3d83d4..82578d1e362 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java @@ -70,6 +70,16 @@ public abstract class AbstractLocalInputSourceParallelIndexTest extends Abstract "%%INPUT_FORMAT%%", jsonMapper.writeValueAsString(inputFormatMap) ); + spec = StringUtils.replace( + spec, + "%%APPEND_TO_EXISTING%%", + jsonMapper.writeValueAsString(false) + ); + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(false) + ); return spec; } catch (Exception e) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java new file mode 100644 index 00000000000..94e4d23f75c --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +@Test(groups = {TestNGGroup.APPEND_INGESTION}) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest +{ + private static final Logger LOG = new Logger(ITAppendBatchIndexTest.class); + private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json"; + // This query file is for the initial ingestion which is one complete dataset with roll up + private static final String INDEX_QUERIES_INITIAL_INGESTION_RESOURCE = "/indexer/wikipedia_index_queries.json"; + // This query file is for the initial ingestion plus the append ingestion which are two complete dataset with roll + // up within each dataset (roll up within the initial ingestion and roll up within the append ingestion but not + // roll up across both dataset). + private static final String INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE = "/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json"; + // This query file is for the initial ingestion plus the append ingestion plus a compaction task after the two ingestions. + // This is two complete dataset with perfect roll up across both dataset. + private static final String INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE = "/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json"; + + private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + + @DataProvider + public static Object[][] resources() + { + return new Object[][]{ + // First index with dynamically-partitioned then append dynamically-partitioned + { + ImmutableList.of( + new DynamicPartitionsSpec(null, null), + new DynamicPartitionsSpec(null, null) + ), + ImmutableList.of(4, 8, 2) + }, + // First index with hash-partitioned then append dynamically-partitioned + { + ImmutableList.of( + new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user")), + new DynamicPartitionsSpec(null, null) + ), + ImmutableList.of(6, 10, 2) + }, + // First index with range-partitioned then append dynamically-partitioned + { + ImmutableList.of( + new SingleDimensionPartitionsSpec(1000, null, "page", false), + new DynamicPartitionsSpec(null, null) + ), + ImmutableList.of(2, 6, 2) + } + }; + } + + @Test(dataProvider = "resources") + public void doIndexTest(List partitionsSpecList, List expectedSegmentCountList) throws Exception + { + final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + // Submit initial ingestion task + submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false); + verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(0)); + doTestQuery(indexDatasource, INDEX_QUERIES_INITIAL_INGESTION_RESOURCE, 2); + // Submit append ingestion task + submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true); + verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(1)); + doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE, 2); + // Submit compaction task + compactData(indexDatasource, COMPACTION_TASK); + // Verification post compaction + verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(2)); + verifySegmentsCompacted(indexDatasource, expectedSegmentCountList.get(2)); + doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE, 2); + } + } + + private void submitIngestionTaskAndVerify( + String indexDatasource, + PartitionsSpec partitionsSpec, + boolean appendToExisting + ) throws Exception + { + InputFormatDetails inputFormatDetails = InputFormatDetails.JSON; + Map inputFormatMap = new ImmutableMap.Builder().put("type", inputFormatDetails.getInputFormatType()) + .build(); + final Function sqlInputSourcePropsTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(partitionsSpec) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_FILTER%%", + "*" + inputFormatDetails.getFileExtension() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_BASE_DIR%%", + "/resources/data/batch_index" + inputFormatDetails.getFolderSuffix() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + jsonMapper.writeValueAsString(inputFormatMap) + ); + spec = StringUtils.replace( + spec, + "%%APPEND_TO_EXISTING%%", + jsonMapper.writeValueAsString(appendToExisting) + ); + if (partitionsSpec instanceof DynamicPartitionsSpec) { + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(false) + ); + } else if (partitionsSpec instanceof HashedPartitionsSpec || partitionsSpec instanceof SingleDimensionPartitionsSpec) { + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(true) + ); + } + return spec; + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + sqlInputSourcePropsTransform, + null, + false, + false, + true + ); + } +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json index 8bb0ab91efb..fb620c11aa2 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json @@ -1,5 +1,14 @@ { "type" : "compact", "dataSource" : "%%DATASOURCE%%", - "interval" : "2013-08-31/2013-09-02" + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2013-08-31/2013-09-02" + } + }, + "context" : { + "storeCompactionState" : true + } } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json new file mode 100644 index 00000000000..586da63e3db --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json @@ -0,0 +1,143 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T01:02:33.000Z", + "result" : { + "minTime" : "2013-08-31T01:02:33.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":10, + "approxCountTheta":5.0, + "approxCountHLL":5 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 18100.0, + "page" : "Crimson Typhoon", + "added_count" : 1810, + "rows" : 2 + } + } ] + }, + { + "description": "timeseries, count aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "count", + "name": "rows" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "rows":10 + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json new file mode 100644 index 00000000000..eaa9592ca26 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json @@ -0,0 +1,143 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T01:02:33.000Z", + "result" : { + "minTime" : "2013-08-31T01:02:33.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":10, + "approxCountTheta":5.0, + "approxCountHLL":5 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 18100.0, + "page" : "Crimson Typhoon", + "added_count" : 1810, + "rows" : 1 + } + } ] + }, + { + "description": "timeseries, count aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "count", + "name": "rows" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "rows":5 + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json index 67650f8e1be..a533a848e33 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json @@ -71,15 +71,17 @@ "filter" : "%%INPUT_SOURCE_FILTER%%", "baseDir": "%%INPUT_SOURCE_BASE_DIR%%" }, + "appendToExisting": %%APPEND_TO_EXISTING%%, "inputFormat": %%INPUT_FORMAT%% }, "tuningConfig": { "type": "index_parallel", - "maxNumConcurrentSubTasks": 10, + "maxNumConcurrentSubTasks": 4, "splitHintSpec": { "type": "maxSize", "maxSplitSize": 1 }, + "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%, "partitionsSpec": %%PARTITIONS_SPEC%% } }