From 62c3e89266037da2814fb3504f1c22b34064ade1 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 28 Mar 2019 20:25:10 -0700 Subject: [PATCH] maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7368) * maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes * remove unnecessary default values * remove flacky test * fix build * Add comments --- .../druid/tests/hadoop/ITHadoopIndexTest.java | 2 +- .../indexer/AbstractITBatchIndexTest.java | 6 +- .../AbstractITRealtimeIndexTaskTest.java | 2 +- .../tests/indexer/AbstractIndexerTest.java | 2 +- .../indexer/AbstractKafkaIndexerTest.java | 2 +- .../tests/indexer/ITCompactionTaskTest.java | 4 +- .../indexer/ITNestedQueryPushDownTest.java | 2 +- .../druid/tests/indexer/ITUnionQueryTest.java | 2 +- .../client/indexing/ClientCompactQuery.java | 13 ++ .../ClientCompactQueryTuningConfig.java | 64 ++++----- .../indexing/HttpIndexingServiceClient.java | 29 ++-- .../DataSourceCompactionConfig.java | 59 ++++++-- .../DruidCoordinatorSegmentCompactor.java | 1 - .../DataSourceCompactionConfigTest.java | 128 ++++++++++++++++++ 14 files changed, 250 insertions(+), 66 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java index 5e6d66b1213..4ed0bc2b75b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java @@ -64,7 +64,7 @@ public class ITHadoopIndexTest extends AbstractIndexerTest try { LOG.info("indexerFile name: [%s]", BATCH_TASK); - indexerSpec = getTaskAsString(BATCH_TASK); + indexerSpec = getResourceAsString(BATCH_TASK); indexerSpec = StringUtils.replace(indexerSpec, "%%HADOOP_TEST_PATH%%", hadoopDir); } catch (Exception e) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index df95de927c1..0c6a64eb0aa 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -56,7 +56,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest { final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); final String taskSpec = StringUtils.replace( - getTaskAsString(indexTaskFilePath), + getResourceAsString(indexTaskFilePath), "%%DATASOURCE%%", fullDatasourceName ); @@ -98,7 +98,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix(); String taskSpec = StringUtils.replace( - getTaskAsString(reindexTaskFilePath), + getResourceAsString(reindexTaskFilePath), "%%DATASOURCE%%", fullBaseDatasourceName ); @@ -148,7 +148,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest { final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); final String taskSpec = StringUtils.replace( - getTaskAsString(indexTaskFilePath), + getResourceAsString(indexTaskFilePath), "%%DATASOURCE%%", fullDatasourceName ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index 07ec6b66f4c..eefcba24223 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -88,7 +88,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes try (final Closeable closeable = unloader(fullDatasourceName)) { // the task will run for 3 minutes and then shutdown itself String task = setShutOffTime( - getTaskAsString(getTaskResource()), + getResourceAsString(getTaskResource()), DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) ); task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index b739d79dddd..30b7439bb7c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -112,7 +112,7 @@ public abstract class AbstractIndexerTest ); } - protected String getTaskAsString(String file) throws IOException + protected String getResourceAsString(String file) throws IOException { final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file); try { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index fa9c64bf2dc..e01c6e6458d 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -138,7 +138,7 @@ public class AbstractKafkaIndexerTest extends AbstractIndexerTest consumerProperties.putAll(consumerConfigs); consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost()); - spec = getTaskAsString(INDEXER_FILE); + spec = getResourceAsString(INDEXER_FILE); spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME); spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index db6ebff48ab..264edc296f0 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -131,7 +131,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void loadData() throws Exception { - String taskSpec = getTaskAsString(INDEX_TASK); + String taskSpec = getResourceAsString(INDEX_TASK); taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); @@ -145,7 +145,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void compactData(boolean keepSegmentGranularity) throws Exception { - final String template = getTaskAsString(COMPACTION_TASK); + final String template = getResourceAsString(COMPACTION_TASK); String taskSpec = StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity)); taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java index 3cad6a36a3c..07630891640 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java @@ -96,7 +96,7 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest private void loadData() throws Exception { - String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK); + String taskSpec = getResourceAsString(WIKITICKER_INDEX_TASK); taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java index 8b65c40db3d..b32a8832356 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java @@ -93,7 +93,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest try { // Load 4 datasources with same dimensions String task = setShutOffTime( - getTaskAsString(UNION_TASK_RESOURCE), + getResourceAsString(UNION_TASK_RESOURCE), DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) ); List taskIDs = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index c87ff90d136..f32b11d2651 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -99,4 +99,17 @@ public class ClientCompactQuery implements ClientQuery { return context; } + + @Override + public String toString() + { + return "ClientCompactQuery{" + + "dataSource='" + dataSource + '\'' + + ", segments=" + segments + + ", keepSegmentGranularity=" + keepSegmentGranularity + + ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + + ", tuningConfig=" + tuningConfig + + ", context=" + context + + '}'; + } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java index 9bae161c76a..068016952b4 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java @@ -29,20 +29,18 @@ import java.util.Objects; public class ClientCompactQueryTuningConfig { - // These default values should be synchronized with those of IndexTuningConfig - private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000; - private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; - private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); - private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; - private static final long DEFAULT_PUSH_TIMEOUT = 0; - @Nullable private final Integer maxRowsPerSegment; - private final int maxRowsInMemory; - private final int maxTotalRows; + @Nullable + private final Integer maxRowsInMemory; + @Nullable + private final Integer maxTotalRows; + @Nullable private final IndexSpec indexSpec; - private final int maxPendingPersists; - private final long pushTimeout; + @Nullable + private final Integer maxPendingPersists; + @Nullable + private final Long pushTimeout; public static ClientCompactQueryTuningConfig from( @Nullable UserCompactTuningConfig userCompactTuningConfig, @@ -70,11 +68,11 @@ public class ClientCompactQueryTuningConfig ) { this.maxRowsPerSegment = maxRowsPerSegment; - this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; - this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; - this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; - this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; - this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout; + this.maxRowsInMemory = maxRowsInMemory; + this.maxTotalRows = maxTotalRows; + this.indexSpec = indexSpec; + this.maxPendingPersists = maxPendingPersists; + this.pushTimeout = pushTimeout; } @JsonProperty @@ -91,31 +89,36 @@ public class ClientCompactQueryTuningConfig } @JsonProperty - public int getMaxRowsInMemory() + @Nullable + public Integer getMaxRowsInMemory() { return maxRowsInMemory; } @JsonProperty - public int getMaxTotalRows() + @Nullable + public Integer getMaxTotalRows() { return maxTotalRows; } @JsonProperty + @Nullable public IndexSpec getIndexSpec() { return indexSpec; } @JsonProperty - public int getMaxPendingPersists() + @Nullable + public Integer getMaxPendingPersists() { return maxPendingPersists; } @JsonProperty - public long getPushTimeout() + @Nullable + public Long getPushTimeout() { return pushTimeout; } @@ -130,25 +133,18 @@ public class ClientCompactQueryTuningConfig return false; } ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o; - return maxRowsInMemory == that.maxRowsInMemory && - maxTotalRows == that.maxTotalRows && - maxPendingPersists == that.maxPendingPersists && - pushTimeout == that.pushTimeout && - Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && - Objects.equals(indexSpec, that.indexSpec); + return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(maxRowsInMemory, that.maxRowsInMemory) && + Objects.equals(maxTotalRows, that.maxTotalRows) && + Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(maxPendingPersists, that.maxPendingPersists) && + Objects.equals(pushTimeout, that.pushTimeout); } @Override public int hashCode() { - return Objects.hash( - maxRowsPerSegment, - maxRowsInMemory, - maxTotalRows, - indexSpec, - maxPendingPersists, - pushTimeout - ); + return Objects.hash(maxRowsPerSegment, maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, pushTimeout); } @Override diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index ac4349b2b6d..c5c9531ffd8 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -22,6 +22,7 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.inject.Inject; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskStatusPlus; @@ -71,7 +72,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient boolean keepSegmentGranularity, @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, - @Nullable ClientCompactQueryTuningConfig tuningConfig, + ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context ) { @@ -103,14 +104,20 @@ public class HttpIndexingServiceClient implements IndexingServiceClient { try { final FullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest( - HttpMethod.POST, - "/druid/indexer/v1/task" - ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject)) + druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task") + .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject)) ); if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE("Failed to post task[%s]", taskObject); + if (!Strings.isNullOrEmpty(response.getContent())) { + throw new ISE( + "Failed to post task[%s] with error[%s].", + taskObject, + response.getContent() + ); + } else { + throw new ISE("Failed to post task[%s]. Please check overlord log", taskObject); + } } final Map resultMap = jsonMapper.readValue( @@ -132,10 +139,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient final FullResponseHolder response = druidLeaderClient.go( druidLeaderClient.makeRequest( HttpMethod.POST, - StringUtils.format( - "/druid/indexer/v1/task/%s/shutdown", - StringUtils.urlEncode(taskId) - ) + StringUtils.format("/druid/indexer/v1/task/%s/shutdown", StringUtils.urlEncode(taskId)) ) ); @@ -267,7 +271,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient { try { final FullResponseHolder responseHolder = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId)) + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format("/druid/indexer/v1/task/%s", StringUtils.urlEncode(taskId)) + ) ); return jsonMapper.readValue( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index ef33fb024b6..867eef13cc5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -71,10 +71,6 @@ public class DataSourceCompactionConfig @JsonProperty("taskContext") @Nullable Map taskContext ) { - Preconditions.checkArgument( - targetCompactionSizeBytes == null || maxRowsPerSegment == null, - "targetCompactionSizeBytes and maxRowsPerSegment in tuningConfig can't be used together" - ); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.keepSegmentGranularity = keepSegmentGranularity == null ? DEFAULT_KEEP_SEGMENT_GRANULARITY @@ -85,11 +81,11 @@ public class DataSourceCompactionConfig this.inputSegmentSizeBytes = inputSegmentSizeBytes == null ? DEFAULT_INPUT_SEGMENT_SIZE_BYTES : inputSegmentSizeBytes; - if (targetCompactionSizeBytes == null && maxRowsPerSegment == null) { - this.targetCompactionSizeBytes = DEFAULT_TARGET_COMPACTION_SIZE_BYTES; - } else { - this.targetCompactionSizeBytes = targetCompactionSizeBytes; - } + this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes( + targetCompactionSizeBytes, + maxRowsPerSegment, + tuningConfig + ); this.maxRowsPerSegment = maxRowsPerSegment; this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null ? DEFAULT_NUM_INPUT_SEGMENTS @@ -104,6 +100,51 @@ public class DataSourceCompactionConfig ); } + /** + * This method is copied from {@code CompactionTask#getValidTargetCompactionSizeBytes}. The only difference is this + * method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}. + * + * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse + * the same method, this method must be synced with {@code CompactionTask#getValidTargetCompactionSizeBytes}. + */ + @Nullable + private static Long getValidTargetCompactionSizeBytes( + @Nullable Long targetCompactionSizeBytes, + @Nullable Integer maxRowsPerSegment, + @Nullable UserCompactTuningConfig tuningConfig + ) + { + if (targetCompactionSizeBytes != null) { + Preconditions.checkArgument( + !hasPartitionConfig(maxRowsPerSegment, tuningConfig), + "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s] and maxTotalRows[%s]", + targetCompactionSizeBytes, + maxRowsPerSegment, + tuningConfig == null ? null : tuningConfig.getMaxTotalRows() + ); + return targetCompactionSizeBytes; + } else { + return hasPartitionConfig(maxRowsPerSegment, tuningConfig) ? null : DEFAULT_TARGET_COMPACTION_SIZE_BYTES; + } + } + + /** + * his method is copied from {@code CompactionTask#hasPartitionConfig}. The two differences are + * 1) this method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}, and + * 2) this method accepts an additional 'maxRowsPerSegment' parameter since it's not supported by + * {@link UserCompactTuningConfig}. + * + * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse + * the same method, this method must be synced with {@code CompactionTask#hasPartitionConfig}. + */ + private static boolean hasPartitionConfig( + @Nullable Integer maxRowsPerSegment, + @Nullable UserCompactTuningConfig tuningConfig + ) + { + return maxRowsPerSegment != null || (tuningConfig != null && tuningConfig.getMaxTotalRows() != null); + } + @JsonProperty public String getDataSource() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 7cbffa540d3..0258cd0069f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -175,7 +175,6 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) { final List segmentsToCompact = iterator.next(); - final String dataSourceName = segmentsToCompact.get(0).getDataSource(); if (segmentsToCompact.size() > 1) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index eae6043c214..84a3d0d48ea 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -25,7 +25,9 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; @@ -33,6 +35,9 @@ public class DataSourceCompactionConfigTest { private static final ObjectMapper objectMapper = new DefaultObjectMapper(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Test public void testSerdeBasic() throws IOException { @@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(25, fromJson.getTaskPriority()); Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); @@ -85,6 +91,7 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(25, fromJson.getTaskPriority()); Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); @@ -101,4 +108,125 @@ public class DataSourceCompactionConfigTest final UserCompactTuningConfig fromJson = objectMapper.readValue(json, UserCompactTuningConfig.class); Assert.assertEquals(config, fromJson); } + + @Test + public void testSerdeWithMaxTotalRows() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + null, + 500L, + null, + null, + 20, + new Period(3600), + new UserCompactTuningConfig( + null, + 10000, + null, + null, + null + ), + ImmutableMap.of("key", "val") + ); + final String json = objectMapper.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertTrue(fromJson.isKeepSegmentGranularity()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + } + + @Test + public void testTargetCompactionSizeBytesWithMaxRowsPerSegment() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[1000] and maxTotalRows[null]" + ); + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + null, + 500L, + 10000L, + 1000, + 20, + new Period(3600), + null, + ImmutableMap.of("key", "val") + ); + } + + @Test + public void testTargetCompactionSizeBytesWithMaxTotalRows() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[null] and maxTotalRows[10000]" + ); + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + null, + 500L, + 10000L, + null, + 20, + new Period(3600), + new UserCompactTuningConfig( + null, + 10000, + null, + null, + null + ), + ImmutableMap.of("key", "val") + ); + } + + @Test + public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + null, + 500L, + null, + 10000, + 20, + new Period(3600), + new UserCompactTuningConfig( + null, + 10000, + null, + null, + null + ), + ImmutableMap.of("key", "val") + ); + + final String json = objectMapper.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertTrue(fromJson.isKeepSegmentGranularity()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + } }