Integration tests and docs for auto compaction with different partitioning (#10354)

* Working

* add test

* doc

* fix test

* split other integration test

* exclude other-index from other tests

* doc anchor fix

* adjust task slots and number of merge tasks

* spell check

* reduce maxNumConcurrentSubTasks to 1

* maxNumConcurrentSubtasks for range partitinoing

* reduce memory for historical

* change group name
This commit is contained in:
Jihoon Son 2020-09-15 11:28:09 -07:00 committed by GitHub
parent e465f05717
commit 8657b23ab2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 128 additions and 38 deletions

View File

@ -420,11 +420,19 @@ jobs:
script: *run_integration_test script: *run_integration_test
after_failure: *integration_test_diags after_failure: *integration_test_diags
- &integration_compaction_tests
name: "(Compile=openjdk8, Run=openjdk8) compaction integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_tests - &integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration test" name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8 jdk: openjdk8
services: *integration_test_services services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11'
script: *run_integration_test script: *run_integration_test
after_failure: *integration_test_diags after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8 # END - Integration tests for Compile with Java 8 and Run with Java 8
@ -475,10 +483,15 @@ jobs:
jdk: openjdk8 jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
- <<: *integration_compaction_tests
name: "(Compile=openjdk8, Run=openjdk11) compaction integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=11'
- <<: *integration_tests - <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test" name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8 jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11 # END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities" - name: "security vulnerabilities"

View File

@ -830,16 +830,28 @@ If you see this problem, it's recommended to set `skipOffsetFromLatest` to some
###### Compaction TuningConfig ###### Compaction TuningConfig
Auto compaction supports a subset of the [tuningConfig for Parallel task](../ingestion/native-batch.md#tuningconfig).
The below is a list of the supported configurations for auto compaction.
|Property|Description|Required| |Property|Description|Required|
|--------|-----------|--------| |--------|-----------|--------|
|`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1000000)| |type|The task type, this should always be `index_parallel`.|yes|
|`maxBytesInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (1/6 of max JVM memory)| |`maxRowsInMemory`|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default = 1000000)|
|`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 20000000)| |`maxBytesInMemory`|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is `maxBytesInMemory` * (2 + `maxPendingPersists`)|no (default = 1/6 of max JVM memory)|
|`splitHintSpec`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = null)| |`splitHintSpec`|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](../ingestion/native-batch.md#split-hint-spec) for more details.|no (default = size-based split hint spec)|
|`indexSpec`|See [IndexSpec](../ingestion/index.md#indexspec)|no| |`partitionsSpec`|Defines how to partition data in each time chunk, see [`PartitionsSpec`](../ingestion/native-batch.md#partitionsspec)|no (default = `dynamic`)|
|`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))| |`indexSpec`|Defines segment storage format options to be used at indexing time, see [IndexSpec](../ingestion/index.md#indexspec)|no|
|`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0)| |`indexSpecForIntermediatePersists`|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](../ingestion/index.md#indexspec) for possible values.|no|
|`maxNumConcurrentSubTasks`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1)| |`maxPendingPersists`|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with `maxRowsInMemory` * (2 + `maxPendingPersists`).|no (default = 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`pushTimeout`|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|no (default = 0)|
|`segmentWriteOutMediumFactory`|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](../ingestion/native-batch.md#segmentwriteoutmediumfactory).|no (default is the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|`maxNumConcurrentSubTasks`|Maximum number of worker tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](../ingestion/native-batch.md#capacity-planning) for more details.|no (default = 1)|
|`maxRetry`|Maximum number of retries on task failures.|no (default = 3)|
|`maxNumSegmentsToMerge`|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only with `hashed` or `single_dim` partitionsSpec.|no (default = 100)|
|`totalNumMergeTasks`|Total number of tasks to merge segments in the merge phase when `partitionsSpec` is set to `hashed` or `single_dim`.|no (default = 10)|
|`taskStatusCheckPeriodMs`|Polling period in milliseconds to check running task statuses.|no (default = 1000)|
|`chatHandlerTimeout`|Timeout for reporting the pushed segments in worker tasks.|no (default = PT10S)|
|`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|no (default = 5)|
### Overlord ### Overlord

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=historical
DRUID_LOG_PATH=/shared/logs/historical.log DRUID_LOG_PATH=/shared/logs/historical.log
# JAVA OPTS # JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx768m -Xms768m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007 SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
# Druid configs # Druid configs
druid_processing_buffer_sizeBytes=25000000 druid_processing_buffer_sizeBytes=25000000

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=historical-for-query-retry-test
DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log
# JAVA OPTS # JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx768m -Xms768m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010 SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
# Druid configs # Druid configs
druid_processing_buffer_sizeBytes=25000000 druid_processing_buffer_sizeBytes=25000000

View File

@ -36,4 +36,4 @@ druid_indexer_task_chathandler_type=announce
druid_auth_basic_common_cacheDirectory=/tmp/authCache/middleManager druid_auth_basic_common_cacheDirectory=/tmp/authCache/middleManager
druid_startup_logging_logProperties=true druid_startup_logging_logProperties=true
druid_server_https_crlPath=/tls/revocations.crl druid_server_https_crlPath=/tls/revocations.crl
druid_worker_capacity=20 druid_worker_capacity=10

View File

@ -41,7 +41,7 @@ public class TestNGGroup
public static final String KAFKA_DATA_FORMAT = "kafka-data-format"; public static final String KAFKA_DATA_FORMAT = "kafka-data-format";
public static final String OTHER_INDEX = "other-index"; public static final String COMPACTION = "compaction";
public static final String APPEND_INGESTION = "append-ingestion"; public static final String APPEND_INGESTION = "append-ingestion";

View File

@ -21,12 +21,17 @@ package org.apache.druid.tests.coordinator.duty;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient; import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.guice.DruidTestModuleFactory;
@ -49,7 +54,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@Test(groups = {TestNGGroup.OTHER_INDEX}) @Test(groups = {TestNGGroup.COMPACTION})
@Guice(moduleFactory = DruidTestModuleFactory.class) @Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAutoCompactionTest extends AbstractIndexerTest public class ITAutoCompactionTest extends AbstractIndexerTest
{ {
@ -57,7 +62,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
private static final Period SKIP_OFFSET_FROM_LATEST = Period.seconds(0); private static final Period NO_SKIP_OFFSET = Period.seconds(0);
@Inject @Inject
protected CompactionResourceTestClient compactionResource; protected CompactionResourceTestClient compactionResource;
@ -93,7 +98,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction); checkCompactionIntervals(intervalsBeforeCompaction);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total) //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2); forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
@ -114,16 +119,41 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
// Dummy compaction config which will be overwritten // Dummy compaction config which will be overwritten
submitCompactionConfig(10000, SKIP_OFFSET_FROM_LATEST); submitCompactionConfig(10000, NO_SKIP_OFFSET);
// New compaction config should overwrites the existing compaction config // New compaction config should overwrites the existing compaction config
submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST); submitCompactionConfig(1, NO_SKIP_OFFSET);
LOG.info("Auto compaction test with dynamic partitioning");
// Instead of merging segments, the updated config will split segments! // Instead of merging segments, the updated config will split segments!
//...compacted into 10 new segments across 2 days. 5 new segments each day (10 total) //...compacted into 10 new segments across 2 days. 5 new segments each day (10 total)
forceTriggerAutoCompaction(10); forceTriggerAutoCompaction(10);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(10, 1); verifySegmentsCompacted(10, 1);
checkCompactionIntervals(intervalsBeforeCompaction);
LOG.info("Auto compaction test with hash partitioning");
final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1);
// 2 segments published per day after compaction.
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(hashedPartitionsSpec, 4);
checkCompactionIntervals(intervalsBeforeCompaction);
LOG.info("Auto compaction test with range partitioning");
final SingleDimensionPartitionsSpec rangePartitionsSpec = new SingleDimensionPartitionsSpec(
5,
null,
"city",
false
);
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 2);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
checkCompactionIntervals(intervalsBeforeCompaction); checkCompactionIntervals(intervalsBeforeCompaction);
} }
} }
@ -139,7 +169,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4); verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
deleteCompactionConfig(); deleteCompactionConfig();
// ...should remains unchanged (4 total) // ...should remains unchanged (4 total)
@ -164,7 +194,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4); verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
// ...should remains unchanged (4 total) // ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4); forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
@ -224,13 +254,42 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest) throws Exception private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest) throws Exception
{ {
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(fullDatasourceName, submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1);
}
private void submitCompactionConfig(
PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest,
int maxNumConcurrentSubTasks
) throws Exception
{
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
fullDatasourceName,
null,
null, null,
null, null,
maxRowsPerSegment,
skipOffsetFromLatest, skipOffsetFromLatest,
new UserCompactionTaskQueryTuningConfig(
null, null,
null); null,
null,
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,
null,
null,
null,
null,
maxNumConcurrentSubTasks,
null,
null,
null,
null,
null,
1
),
null
);
compactionResource.submitCompactionConfig(compactionConfig); compactionResource.submitCompactionConfig(compactionConfig);
// Wait for compaction config to persist // Wait for compaction config to persist
@ -245,12 +304,14 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
} }
} }
Assert.assertNotNull(foundDataSourceCompactionConfig); Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertEquals(foundDataSourceCompactionConfig.getMaxRowsPerSegment(), maxRowsPerSegment); Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), partitionsSpec);
Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest); Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest);
foundDataSourceCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName); foundDataSourceCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
Assert.assertNotNull(foundDataSourceCompactionConfig); Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertEquals(foundDataSourceCompactionConfig.getMaxRowsPerSegment(), maxRowsPerSegment); Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), partitionsSpec);
Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest); Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest);
} }
@ -305,6 +366,14 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
} }
private void verifySegmentsCompacted(int expectedCompactedSegmentCount, Integer expectedMaxRowsPerSegment) private void verifySegmentsCompacted(int expectedCompactedSegmentCount, Integer expectedMaxRowsPerSegment)
{
verifySegmentsCompacted(
new DynamicPartitionsSpec(expectedMaxRowsPerSegment, Long.MAX_VALUE),
expectedCompactedSegmentCount
);
}
private void verifySegmentsCompacted(PartitionsSpec partitionsSpec, int expectedCompactedSegmentCount)
{ {
List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName); List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
List<DataSegment> foundCompactedSegments = new ArrayList<>(); List<DataSegment> foundCompactedSegments = new ArrayList<>();
@ -317,11 +386,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
for (DataSegment compactedSegment : foundCompactedSegments) { for (DataSegment compactedSegment : foundCompactedSegments) {
Assert.assertNotNull(compactedSegment.getLastCompactionState()); Assert.assertNotNull(compactedSegment.getLastCompactionState());
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getMaxRowsPerSegment(), Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec(), partitionsSpec);
expectedMaxRowsPerSegment);
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(),
SecondaryPartitionType.LINEAR
);
} }
} }

View File

@ -39,7 +39,7 @@ import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
@Test(groups = {TestNGGroup.OTHER_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE}) @Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE})
@Guice(moduleFactory = DruidTestModuleFactory.class) @Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCompactionTaskTest extends AbstractIndexerTest public class ITCompactionTaskTest extends AbstractIndexerTest
{ {

View File

@ -306,6 +306,7 @@ numerics
parameterized parameterized
parseable parseable
partitioner partitioner
partitionsSpec
performant performant
plaintext plaintext
pluggable pluggable
@ -971,7 +972,6 @@ overwriteFiles
partitionDimension partitionDimension
partitionDimensions partitionDimensions
partitionSpec partitionSpec
partitionsSpec
pathFormat pathFormat
segmentOutputPath segmentOutputPath
segmentTable segmentTable