Add integration tests for Appends (#10186)

* append test

* add append IT

* fix checkstyle

* fix checkstyle

* Remove parallel

* fix checkstyle

* fix

* fix

* address comments

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix
This commit is contained in:
Maytas Monsereenusorn 2020-07-20 13:43:13 -07:00 committed by GitHub
parent eeb9012743
commit 0cabc53bd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 666 additions and 62 deletions

View File

@ -393,11 +393,19 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_append_ingestion
name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
@ -433,10 +441,15 @@ jobs:
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11'
- <<: *integration_append_ingestion
name: "(Compile=openjdk8, Run=openjdk11) append ingestion integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities"

View File

@ -31,7 +31,6 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
@ -95,9 +94,9 @@ public class CoordinatorResourceTestClient
return StringUtils.format("%sdatasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}
private String getLoadStatusURL()
private String getLoadStatusURL(String dataSource)
{
return StringUtils.format("%s%s", getCoordinatorURL(), "loadstatus");
return StringUtils.format("%sdatasources/%s/loadstatus?forceMetadataRefresh=true&interval=1970-01-01/2999-01-01", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}
/** return a list of the segment dates for the specified data source */
@ -173,11 +172,27 @@ public class CoordinatorResourceTestClient
}
}
private Map<String, Integer> getLoadStatus()
private Map<String, Integer> getLoadStatus(String dataSorce)
{
String url = getLoadStatusURL(dataSorce);
Map<String, Integer> status;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)),
responseHandler
).get();
if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) {
return null;
}
if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
throw new ISE(
"Error while making request to url[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}
status = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
@ -191,18 +206,10 @@ public class CoordinatorResourceTestClient
return status;
}
/**
* Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which
* caches segments in memory and periodically updates them. Hence, there can be a race condition as
* this API implementation compares segments metadata from cache with segments in historicals.
* Particularly, when number of segment changes after the first initial load of the datasource.
* Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments}
* before calling this method (since, that would wait until the cache is updated with expected data)
*/
public boolean areSegmentsLoaded(String dataSource)
{
final Map<String, Integer> status = getLoadStatus();
return (status.containsKey(dataSource) && status.get(dataSource) == 100.0);
final Map<String, Integer> status = getLoadStatus(dataSource);
return (status != null && status.containsKey(dataSource) && status.get(dataSource) == 100.0);
}
public void unloadSegmentsForDataSource(String dataSource)

View File

@ -19,6 +19,7 @@
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.core.DockerClientBuilder;
@ -31,6 +32,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -51,15 +53,18 @@ public class DruidClusterAdminClient
private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router";
private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager";
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private IntegrationTestingConfig config;
@Inject
DruidClusterAdminClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.config = config;
}
@ -97,6 +102,7 @@ public class DruidClusterAdminClient
public void waitUntilCoordinatorReady()
{
waitUntilInstanceReady(config.getCoordinatorUrl());
postDynamicConfig(CoordinatorDynamicConfig.builder().withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1).build());
}
public void waitUntilHistoricalReady()
@ -159,4 +165,29 @@ public class DruidClusterAdminClient
"Waiting for instance to be ready: [" + host + "]"
);
}
private void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig)
{
ITRetryUtil.retryUntilTrue(
() -> {
try {
String url = StringUtils.format("%s/druid/coordinator/v1/config", config.getCoordinatorUrl());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(coordinatorDynamicConfig)
), StatusResponseHandler.getInstance()
).get();
LOG.info("%s %s", response.getStatus(), response.getContent());
return response.getStatus().equals(HttpResponseStatus.OK);
}
catch (Throwable e) {
LOG.error(e, "");
return false;
}
},
"Posting dynamic config after startup"
);
}
}

View File

@ -41,6 +41,8 @@ public class TestNGGroup
public static final String OTHER_INDEX = "other-index";
public static final String APPEND_INGESTION = "append-ingestion";
public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
/**

View File

@ -87,15 +87,15 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total)
forceTriggerAutoCompaction(5);
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total)
forceTriggerAutoCompaction(6);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
@ -119,8 +119,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST);
// Instead of merging segments, the updated config will split segments!
//...compacted into 10 new segments across 2 days. 5 new segments each day (14 total)
forceTriggerAutoCompaction(14);
//...compacted into 10 new segments across 2 days. 5 new segments each day (10 total)
forceTriggerAutoCompaction(10);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(10, 1);
@ -154,6 +154,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
@Test
public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
{
// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 0);
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
@ -162,19 +164,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
// Skips first day. Should only compact one out of two days.
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 100);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
// Set maxCompactionTaskSlots to 0 to prevent any compaction
updateCompactionTaskSlot(0.1, 0);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@ -183,15 +173,15 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
// One day compacted (1 new segment) and one day remains uncompacted. (5 total)
forceTriggerAutoCompaction(5);
// One day compacted (1 new segment) and one day remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312");
// Run compaction again to compact the remaining day
// Remaining day compacted (1 new segment). Now both days compacted (6 total)
forceTriggerAutoCompaction(6);
// Remaining day compacted (1 new segment). Now both days compacted (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
@ -283,11 +273,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
{
compactionResource.forceTriggerAutoCompaction();
waitForAllTasksToCompleteForDataSource(fullDatasourceName);
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
}
private void verifySegmentsCount(int numExpectedSegments)

View File

@ -22,6 +22,7 @@ package org.apache.druid.tests.indexer;
import com.google.common.collect.FluentIterable;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
@ -43,6 +44,7 @@ import org.testng.Assert;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@ -127,29 +129,33 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion, waitForSegmentsToLoad);
if (runTestQueries) {
doTestQuery(dataSource, queryFilePath, 2);
}
}
protected void doTestQuery(String dataSource, String queryFilePath, int timesToRun)
{
try {
String queryResponseTemplate;
try {
String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryFilePath);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (Exception e) {
LOG.error(e, "Error while testing");
throw new RuntimeException(e);
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryFilePath);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
dataSource + config.getExtraDatasourceNameSuffix()
);
queryHelper.testQueriesFromString(queryResponseTemplate, timesToRun);
}
catch (Exception e) {
LOG.error(e, "Error while testing");
throw new RuntimeException(e);
}
}
@ -321,4 +327,71 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
})
.count();
}
void verifySegmentsCountAndLoaded(String dataSource, int numExpectedSegments)
{
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(dataSource + config.getExtraDatasourceNameSuffix()),
"Segment load check"
);
ITRetryUtil.retryUntilTrue(
() -> {
int segmentCount = coordinator.getAvailableSegments(
dataSource + config.getExtraDatasourceNameSuffix()
).size();
LOG.info("Current segment count: %d, expected: %d", segmentCount, numExpectedSegments);
return segmentCount == numExpectedSegments;
},
"Segment count check"
);
}
void compactData(String dataSource, String compactionTask) throws Exception
{
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
final String template = getResourceAsString(compactionTask);
String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
ITRetryUtil.retryUntilTrue(
() -> {
final List<String> actualIntervals = coordinator.getSegmentIntervals(
dataSource + config.getExtraDatasourceNameSuffix()
);
actualIntervals.sort(null);
return actualIntervals.equals(intervalsBeforeCompaction);
},
"Compaction interval check"
);
}
void verifySegmentsCompacted(String dataSource, int expectedCompactedSegmentCount)
{
List<DataSegment> segments = coordinator.getFullSegmentsMetadata(
dataSource + config.getExtraDatasourceNameSuffix()
);
List<DataSegment> foundCompactedSegments = new ArrayList<>();
for (DataSegment segment : segments) {
if (segment.getLastCompactionState() != null) {
foundCompactedSegments.add(segment);
}
}
Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount);
for (DataSegment compactedSegment : foundCompactedSegments) {
Assert.assertNotNull(compactedSegment.getLastCompactionState());
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(),
SecondaryPartitionType.LINEAR
);
}
}
}

View File

@ -70,6 +70,16 @@ public abstract class AbstractLocalInputSourceParallelIndexTest extends Abstract
"%%INPUT_FORMAT%%",
jsonMapper.writeValueAsString(inputFormatMap)
);
spec = StringUtils.replace(
spec,
"%%APPEND_TO_EXISTING%%",
jsonMapper.writeValueAsString(false)
);
spec = StringUtils.replace(
spec,
"%%FORCE_GUARANTEED_ROLLUP%%",
jsonMapper.writeValueAsString(false)
);
return spec;
}
catch (Exception e) {

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
@Test(groups = {TestNGGroup.APPEND_INGESTION})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest
{
private static final Logger LOG = new Logger(ITAppendBatchIndexTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json";
// This query file is for the initial ingestion which is one complete dataset with roll up
private static final String INDEX_QUERIES_INITIAL_INGESTION_RESOURCE = "/indexer/wikipedia_index_queries.json";
// This query file is for the initial ingestion plus the append ingestion which are two complete dataset with roll
// up within each dataset (roll up within the initial ingestion and roll up within the append ingestion but not
// roll up across both dataset).
private static final String INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE = "/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json";
// This query file is for the initial ingestion plus the append ingestion plus a compaction task after the two ingestions.
// This is two complete dataset with perfect roll up across both dataset.
private static final String INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE = "/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json";
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
@DataProvider
public static Object[][] resources()
{
return new Object[][]{
// First index with dynamically-partitioned then append dynamically-partitioned
{
ImmutableList.of(
new DynamicPartitionsSpec(null, null),
new DynamicPartitionsSpec(null, null)
),
ImmutableList.of(4, 8, 2)
},
// First index with hash-partitioned then append dynamically-partitioned
{
ImmutableList.of(
new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user")),
new DynamicPartitionsSpec(null, null)
),
ImmutableList.of(6, 10, 2)
},
// First index with range-partitioned then append dynamically-partitioned
{
ImmutableList.of(
new SingleDimensionPartitionsSpec(1000, null, "page", false),
new DynamicPartitionsSpec(null, null)
),
ImmutableList.of(2, 6, 2)
}
};
}
@Test(dataProvider = "resources")
public void doIndexTest(List<PartitionsSpec> partitionsSpecList, List<Integer> expectedSegmentCountList) throws Exception
{
final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
try (
final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix());
) {
// Submit initial ingestion task
submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false);
verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(0));
doTestQuery(indexDatasource, INDEX_QUERIES_INITIAL_INGESTION_RESOURCE, 2);
// Submit append ingestion task
submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true);
verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(1));
doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE, 2);
// Submit compaction task
compactData(indexDatasource, COMPACTION_TASK);
// Verification post compaction
verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(2));
verifySegmentsCompacted(indexDatasource, expectedSegmentCountList.get(2));
doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE, 2);
}
}
private void submitIngestionTaskAndVerify(
String indexDatasource,
PartitionsSpec partitionsSpec,
boolean appendToExisting
) throws Exception
{
InputFormatDetails inputFormatDetails = InputFormatDetails.JSON;
Map inputFormatMap = new ImmutableMap.Builder<String, Object>().put("type", inputFormatDetails.getInputFormatType())
.build();
final Function<String, String> sqlInputSourcePropsTransform = spec -> {
try {
spec = StringUtils.replace(
spec,
"%%PARTITIONS_SPEC%%",
jsonMapper.writeValueAsString(partitionsSpec)
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_FILTER%%",
"*" + inputFormatDetails.getFileExtension()
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_BASE_DIR%%",
"/resources/data/batch_index" + inputFormatDetails.getFolderSuffix()
);
spec = StringUtils.replace(
spec,
"%%INPUT_FORMAT%%",
jsonMapper.writeValueAsString(inputFormatMap)
);
spec = StringUtils.replace(
spec,
"%%APPEND_TO_EXISTING%%",
jsonMapper.writeValueAsString(appendToExisting)
);
if (partitionsSpec instanceof DynamicPartitionsSpec) {
spec = StringUtils.replace(
spec,
"%%FORCE_GUARANTEED_ROLLUP%%",
jsonMapper.writeValueAsString(false)
);
} else if (partitionsSpec instanceof HashedPartitionsSpec || partitionsSpec instanceof SingleDimensionPartitionsSpec) {
spec = StringUtils.replace(
spec,
"%%FORCE_GUARANTEED_ROLLUP%%",
jsonMapper.writeValueAsString(true)
);
}
return spec;
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
doIndexTest(
indexDatasource,
INDEX_TASK,
sqlInputSourcePropsTransform,
null,
false,
false,
true
);
}
}

View File

@ -1,5 +1,14 @@
{
"type" : "compact",
"dataSource" : "%%DATASOURCE%%",
"interval" : "2013-08-31/2013-09-02"
"ioConfig" : {
"type": "compact",
"inputSpec": {
"type": "interval",
"interval": "2013-08-31/2013-09-02"
}
},
"context" : {
"storeCompactionState" : true
}
}

View File

@ -0,0 +1,143 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-09-01T12:41:27.000Z"
}
}
]
},
{
"description": "timeseries, datasketch aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "HLLSketchMerge",
"name": "approxCountHLL",
"fieldName": "HLLSketchBuild",
"lgK": 12,
"tgtHllType": "HLL_4",
"round": true
},
{
"type":"thetaSketch",
"name":"approxCountTheta",
"fieldName":"thetaSketch",
"size":16384,
"shouldFinalize":true,
"isInputThetaSketch":false,
"errorBoundsStdDev":null
},
{
"type":"quantilesDoublesSketch",
"name":"quantilesSketch",
"fieldName":"quantilesDoublesSketch",
"k":128
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"quantilesSketch":10,
"approxCountTheta":5.0,
"approxCountHLL":5
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 18100.0,
"page" : "Crimson Typhoon",
"added_count" : 1810,
"rows" : 2
}
} ]
},
{
"description": "timeseries, count aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "count",
"name": "rows"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"rows":10
}
}
]
}
]

View File

@ -0,0 +1,143 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-09-01T12:41:27.000Z"
}
}
]
},
{
"description": "timeseries, datasketch aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "HLLSketchMerge",
"name": "approxCountHLL",
"fieldName": "HLLSketchBuild",
"lgK": 12,
"tgtHllType": "HLL_4",
"round": true
},
{
"type":"thetaSketch",
"name":"approxCountTheta",
"fieldName":"thetaSketch",
"size":16384,
"shouldFinalize":true,
"isInputThetaSketch":false,
"errorBoundsStdDev":null
},
{
"type":"quantilesDoublesSketch",
"name":"quantilesSketch",
"fieldName":"quantilesDoublesSketch",
"k":128
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"quantilesSketch":10,
"approxCountTheta":5.0,
"approxCountHLL":5
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 18100.0,
"page" : "Crimson Typhoon",
"added_count" : 1810,
"rows" : 1
}
} ]
},
{
"description": "timeseries, count aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "count",
"name": "rows"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"rows":5
}
}
]
}
]

View File

@ -71,15 +71,17 @@
"filter" : "%%INPUT_SOURCE_FILTER%%",
"baseDir": "%%INPUT_SOURCE_BASE_DIR%%"
},
"appendToExisting": %%APPEND_TO_EXISTING%%,
"inputFormat": %%INPUT_FORMAT%%
},
"tuningConfig": {
"type": "index_parallel",
"maxNumConcurrentSubTasks": 10,
"maxNumConcurrentSubTasks": 4,
"splitHintSpec": {
"type": "maxSize",
"maxSplitSize": 1
},
"forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%,
"partitionsSpec": %%PARTITIONS_SPEC%%
}
}