diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index c070e466517..ec1d046ee31 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -139,7 +139,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |`reportParseExceptions`|Boolean|*DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| diff --git a/docs/content/development/extensions-core/kinesis-ingestion.md b/docs/content/development/extensions-core/kinesis-ingestion.md index 0578dd23b3b..8c2ac335a1d 100644 --- a/docs/content/development/extensions-core/kinesis-ingestion.md +++ b/docs/content/development/extensions-core/kinesis-ingestion.md @@ -135,7 +135,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence numbers if the next sequence number that it is trying to fetch is less than the earliest available sequence number for that particular shard. The sequence number will be reset to either the earliest or latest sequence number depending on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). This situation typically occurs when messages in Kinesis are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular shard will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md index b9a6d72e798..1c0bfbc58c6 100644 --- a/docs/content/ingestion/hadoop.md +++ b/docs/content/ingestion/hadoop.md @@ -192,7 +192,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)| |useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)| |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)| -|indexSpec|Object|Tune how data is indexed. See below for more information.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| |forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)| |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)| diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index ad7cac91359..c5cd91bbec4 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -185,6 +185,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| @@ -375,6 +376,14 @@ An example of the result is "metricCompression": "lz4", "longEncoding": "longs" }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "concise" + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs" + }, "maxPendingPersists": 0, "reportParseExceptions": false, "pushTimeout": 0, @@ -555,6 +564,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShardSpecs` if you plan to append more data to the same time range in the future. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no| |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no| diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 4258fc9d909..874ae610651 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -179,6 +179,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec tuningConfig.getPartitionsSpec(), tuningConfig.getShardSpecs(), tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getRowFlushBoundary(), tuningConfig.getMaxBytesInMemory(), tuningConfig.isLeaveIntermediate(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index 2104759f296..f23ee7b0fce 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -41,6 +41,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -62,6 +63,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -87,6 +89,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon dir, getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), @@ -112,6 +115,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon ", basePersistDirectory=" + getBasePersistDirectory() + ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index ef6259f6412..60c187f0748 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -85,6 +85,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec null, null, null, + null, null ), ioConfig, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 8e4c6e9f913..27e9b295edf 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -53,6 +53,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @@ -80,6 +81,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -186,6 +188,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig getBasePersistDirectory(), getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index c9506d6ef20..edf4c22842a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2456,6 +2456,7 @@ public class KafkaIndexTaskTest null, null, null, + null, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 63b0e985e0b..57b4f372cf8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Period; import org.junit.Assert; @@ -65,6 +66,7 @@ public class KafkaIndexTaskTuningConfigTest Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); } @@ -81,7 +83,9 @@ public class KafkaIndexTaskTuningConfigTest + " \"intermediatePersistPeriod\": \"PT1H\",\n" + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" - + " \"handoffConditionTimeout\": 100\n" + + " \"handoffConditionTimeout\": 100,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( @@ -103,6 +107,8 @@ public class KafkaIndexTaskTuningConfigTest Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); } @Test @@ -117,6 +123,7 @@ public class KafkaIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -159,6 +166,7 @@ public class KafkaIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -206,6 +214,7 @@ public class KafkaIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -252,6 +261,7 @@ public class KafkaIndexTaskTuningConfigTest config.getBasePersistDirectory(), 0, config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), true, config.isReportParseExceptions(), config.getHandoffConditionTimeout(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 05242da01a2..e8e46adaaa0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -213,6 +213,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, @@ -3051,6 +3052,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 3312a10aa94..5859f9035b0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -63,6 +64,7 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); @@ -90,7 +92,9 @@ public class KafkaSupervisorTuningConfigTest + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" + " \"shutdownTimeout\": \"PT95S\",\n" - + " \"offsetFetchPeriod\": \"PT20S\"\n" + + " \"offsetFetchPeriod\": \"PT20S\",\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue( @@ -116,6 +120,8 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout()); Assert.assertEquals(Duration.standardSeconds(20), config.getOffsetFetchPeriod()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 3cc124f6afd..27e69e8e7e0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -45,6 +45,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -67,6 +68,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 534330e4d10..f033a6ddf5f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -58,6 +58,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @@ -85,6 +86,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -160,6 +162,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC dir, getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index ec72c7de6d7..e921fc98f8a 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -96,6 +96,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec null, null, null, + null, null ), ioConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 3c749be5cff..144bb80b5ad 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -49,6 +49,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @@ -81,6 +82,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, buildV9Directly, reportParseExceptions, handoffConditionTimeout, @@ -191,6 +193,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig getBasePersistDirectory(), getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index d8bf83ce2a3..bb83c0878ad 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2629,6 +2629,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport null, null, null, + null, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 57ffebcb676..918cc2dd3b1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -144,6 +144,7 @@ public class KinesisIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -202,6 +203,7 @@ public class KinesisIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -288,6 +290,7 @@ public class KinesisIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index e6bb39ca993..be5a87e6128 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -168,6 +168,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, @@ -3678,6 +3679,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 2485e977d60..e45168db59a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -45,6 +45,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @@ -73,6 +74,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, buildV9Directly, reportParseExceptions, handoffConditionTimeout, @@ -104,6 +106,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu base.getBasePersistDirectory(), base.getMaxPendingPersists(), base.getIndexSpec(), + base.getIndexSpecForIntermediatePersists(), base.getBuildV9Directly(), base.isReportParseExceptions(), base.getHandoffConditionTimeout(), diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index fc9a5d966a3..4aa8cd25d46 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -299,6 +299,11 @@ public class HadoopDruidIndexerConfig return schema.getTuningConfig().getIndexSpec(); } + public IndexSpec getIndexSpecForIntermediatePersists() + { + return schema.getTuningConfig().getIndexSpecForIntermediatePersists(); + } + public boolean isOverwriteFiles() { return schema.getTuningConfig().isOverwriteFiles(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index 5fd9b3d8f2f..e61f912e9e8 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -55,6 +55,7 @@ public class HadoopTuningConfig implements TuningConfig DEFAULT_PARTITIONS_SPEC, DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, + DEFAULT_INDEX_SPEC, DEFAULT_ROW_FLUSH_BOUNDARY, 0L, false, @@ -81,6 +82,7 @@ public class HadoopTuningConfig implements TuningConfig private final PartitionsSpec partitionsSpec; private final Map> shardSpecs; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int rowFlushBoundary; private final long maxBytesInMemory; private final boolean leaveIntermediate; @@ -105,6 +107,7 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("shardSpecs") Map> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, + final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, @@ -132,6 +135,8 @@ public class HadoopTuningConfig implements TuningConfig this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; @@ -199,6 +204,12 @@ public class HadoopTuningConfig implements TuningConfig return indexSpec; } + @JsonProperty + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @JsonProperty("maxRowsInMemory") public int getRowFlushBoundary() { @@ -314,6 +325,7 @@ public class HadoopTuningConfig implements TuningConfig partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -343,6 +355,7 @@ public class HadoopTuningConfig implements TuningConfig partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -372,6 +385,7 @@ public class HadoopTuningConfig implements TuningConfig partitionsSpec, specs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 6c032b06e7b..f4cad315270 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -605,7 +605,7 @@ public class IndexGeneratorJob implements Jobby ) throws IOException { return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .persist(index, interval, file, config.getIndexSpec(), progressIndicator, null); + .persist(index, interval, file, config.getIndexSpecForIntermediatePersists(), progressIndicator, null); } protected File mergeQueryableIndex( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 3fcf50ad989..d0d7180a6e4 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -462,6 +462,7 @@ public class BatchDeltaIngestionTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 59453fce882..26498746c21 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -199,6 +199,7 @@ public class DetermineHashedPartitionsJobTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index e9265bcabf8..3d8b06bab77 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -262,6 +262,7 @@ public class DeterminePartitionsJobTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index 7573aeaa4d3..9c723ac9abd 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -86,6 +86,7 @@ public class HadoopDruidIndexerConfigTest null, null, null, + null, false, false, false, @@ -164,6 +165,7 @@ public class HadoopDruidIndexerConfigTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index 25535832054..ef29cb738f8 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -43,6 +43,7 @@ public class HadoopTuningConfigTest null, null, null, + null, 100, null, true, @@ -70,6 +71,7 @@ public class HadoopTuningConfigTest Assert.assertNotNull(actual.getPartitionsSpec()); Assert.assertEquals(ImmutableMap.>of(), actual.getShardSpecs()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists()); Assert.assertEquals(100, actual.getRowFlushBoundary()); Assert.assertEquals(true, actual.isLeaveIntermediate()); Assert.assertEquals(true, actual.isCleanupOnFailure()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 3925bcd730c..a7757ac3034 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -521,6 +521,7 @@ public class IndexGeneratorJobTest null, null, null, + null, maxRowsInMemory, maxBytesInMemory, true, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java index 3969e126a0b..ff5aed625ea 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java @@ -114,6 +114,7 @@ public class JobHelperTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java index 7f2ce22f401..4da9ee41b77 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java @@ -62,6 +62,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 9d2bed80ead..1676a2fa5c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -63,6 +63,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long publishAndHandoffTimeout; private final long alertTimeout; @@ -84,6 +85,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout, @JsonProperty("alertTimeout") Long alertTimeout, @@ -106,6 +108,8 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; @@ -196,6 +200,13 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override @JsonProperty public boolean isReportParseExceptions() @@ -253,6 +264,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, publishAndHandoffTimeout, alertTimeout, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java index 6e0e1d861d1..f872042049a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java @@ -236,7 +236,7 @@ public class YeOldePlumberSchool implements PlumberSchool indexMergerV9.persist( indexToPersist.getIndex(), dirToPersist, - config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 27df20a6a47..e766effb412 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1270,6 +1270,7 @@ public class IndexTask extends AbstractTask implements ChatHandler private final Integer numShards; private final List partitionDimensions; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final File basePersistDirectory; private final int maxPendingPersists; @@ -1305,6 +1306,7 @@ public class IndexTask extends AbstractTask implements ChatHandler @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @@ -1327,6 +1329,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1346,7 +1349,7 @@ public class IndexTask extends AbstractTask implements ChatHandler private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1357,6 +1360,7 @@ public class IndexTask extends AbstractTask implements ChatHandler @Nullable Integer numShards, @Nullable List partitionDimensions, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, @Nullable Integer maxPendingPersists, @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, @@ -1384,6 +1388,8 @@ public class IndexTask extends AbstractTask implements ChatHandler this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; this.reportParseExceptions = reportParseExceptions == null @@ -1420,6 +1426,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1442,6 +1449,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1464,6 +1472,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1533,6 +1542,13 @@ public class IndexTask extends AbstractTask implements ChatHandler return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override public File getBasePersistDirectory() { @@ -1618,19 +1634,22 @@ public class IndexTask extends AbstractTask implements ChatHandler } IndexTuningConfig that = (IndexTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && - Objects.equals(maxTotalRows, that.maxTotalRows) && + maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && - Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && - Objects.equals(numShards, that.numShards) && - Objects.equals(indexSpec, that.indexSpec) && - Objects.equals(basePersistDirectory, that.basePersistDirectory) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && - maxSavedParseExceptions == that.maxSavedParseExceptions; + maxSavedParseExceptions == that.maxSavedParseExceptions && + Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(maxTotalRows, that.maxTotalRows) && + Objects.equals(numShards, that.numShards) && + Objects.equals(partitionDimensions, that.partitionDimensions) && + Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && + Objects.equals(basePersistDirectory, that.basePersistDirectory) && + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory); } @Override @@ -1639,18 +1658,21 @@ public class IndexTask extends AbstractTask implements ChatHandler return Objects.hash( maxRowsPerSegment, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, numShards, + partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, basePersistDirectory, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + segmentWriteOutMediumFactory ); } @@ -1663,7 +1685,9 @@ public class IndexTask extends AbstractTask implements ChatHandler ", maxBytesInMemory=" + maxBytesInMemory + ", maxTotalRows=" + maxTotalRows + ", numShards=" + numShards + + ", partitionDimensions=" + partitionDimensions + ", indexSpec=" + indexSpec + + ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists + ", basePersistDirectory=" + basePersistDirectory + ", maxPendingPersists=" + maxPendingPersists + ", forceGuaranteedRollup=" + forceGuaranteedRollup + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 2a29726965c..924be846d9c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -333,6 +333,7 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan tuningConfig.getNumShards(), null, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), true, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 2c415492277..9c480d26550 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -71,6 +71,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig null, null, null, + null, null ); } @@ -84,6 +85,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -109,6 +111,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig numShards, null, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, null, forceGuaranteedRollup, @@ -188,7 +191,6 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig @Override public int hashCode() { - return Objects.hash( super.hashCode(), maxNumSubTasks, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index b594e42a66c..1d48ad9c8c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -47,6 +47,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi @Deprecated private final int maxPendingPersists; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -68,6 +69,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi @Nullable File basePersistDirectory, @Nullable Integer maxPendingPersists, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @Nullable Boolean reportParseExceptions, @@ -96,6 +98,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi this.basePersistDirectory = defaults.getBasePersistDirectory(); this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; @@ -187,6 +191,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -281,6 +292,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @@ -297,6 +309,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index c1d682e1e28..d675069b102 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1416,6 +1416,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest null, null, null, + null, reportParseExceptions, handoffTimeout, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 69f10f6be40..bb6993848c9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -288,6 +288,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -462,6 +463,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -522,6 +524,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -582,6 +585,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -846,6 +850,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -1034,6 +1039,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 97279f94b64..4bae4c8ed75 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1004,6 +1004,7 @@ public class IndexTaskTest null, indexSpec, null, + null, true, true, false, @@ -1126,6 +1127,7 @@ public class IndexTaskTest null, indexSpec, null, + null, true, false, false, @@ -1241,6 +1243,7 @@ public class IndexTaskTest null, indexSpec, null, + null, true, true, false, @@ -1706,6 +1709,7 @@ public class IndexTaskTest partitionDimensions, indexSpec, null, + null, true, forceGuaranteedRollup, reportParseException, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 6901543de1c..daeeaab63bb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -835,6 +835,7 @@ public class RealtimeIndexTaskTest null, null, null, + null, true, 0, 0, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index a09dd65d95a..a58b2951e00 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -202,6 +202,7 @@ public class TaskSerdeTest null, null, indexSpec, + null, 3, true, false, @@ -284,6 +285,7 @@ public class TaskSerdeTest null, null, indexSpec, + null, 3, true, false, @@ -393,6 +395,7 @@ public class TaskSerdeTest NoneShardSpec.instance(), indexSpec, null, + null, 0, 0, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 6f7c3b9e713..c7c8d0b897e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -188,6 +188,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu null, null, null, + null, numTotalSubTasks, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 704c5fa4500..04aa5a7a1b0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -430,6 +430,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index dd9096310d8..0fa747f6604 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -138,6 +138,7 @@ public class ParallelIndexSupervisorTaskSerdeTest null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index cfd02ac18ce..cf7dd372bfd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -250,6 +250,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv null, null, null, + null, 1, null, null, @@ -290,6 +291,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index d587c6c0591..2d393020b3a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -71,6 +71,7 @@ public class ParallelIndexTuningConfigTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + new IndexSpec(), 1, false, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6bdfbe4371b..d8c0af320a9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -690,6 +690,7 @@ public class TaskLifecycleTest null, null, indexSpec, + null, 3, true, false, @@ -771,6 +772,7 @@ public class TaskLifecycleTest null, null, indexSpec, + null, 3, true, false, @@ -1160,6 +1162,7 @@ public class TaskLifecycleTest null, indexSpec, null, + null, false, null, null, @@ -1290,6 +1293,7 @@ public class TaskLifecycleTest null, null, null, + null, 0, 0, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index ae55fffc637..6381d0fb1f5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -661,6 +661,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 43be6e080a4..84edad16506 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -82,6 +82,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig defaultMaxPendingPersists, defaultShardSpec, defaultIndexSpec, + defaultIndexSpec, true, 0, 0, @@ -103,6 +104,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int persistThreadPriority; private final int mergeThreadPriority; private final boolean reportParseExceptions; @@ -125,6 +127,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("persistThreadPriority") int persistThreadPriority, @@ -152,6 +155,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.mergeThreadPriority = mergeThreadPriority; this.persistThreadPriority = persistThreadPriority; this.reportParseExceptions = reportParseExceptions == null @@ -233,6 +238,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -302,6 +314,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, @@ -326,6 +339,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 1e81792c430..2889a988a4d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -67,6 +67,8 @@ public interface AppenderatorConfig IndexSpec getIndexSpec(); + IndexSpec getIndexSpecForIntermediatePersists(); + File getBasePersistDirectory(); @Nullable diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 647340bbb50..136b24e8443 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -61,7 +61,6 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; @@ -1260,12 +1259,11 @@ public class AppenderatorImpl implements Appenderator final File persistedFile; final File persistDir = createPersistDirIfNeeded(identifier); - final IndexSpec indexSpec = tuningConfig.getIndexSpec(); persistedFile = indexMerger.persist( indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), - indexSpec, + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 8e0ad2b999f..7aa59f6d8cc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -53,7 +53,6 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -955,14 +954,12 @@ public class RealtimePlumber implements Plumber try { int numRows = indexToPersist.getIndex().size(); - final IndexSpec indexSpec = config.getIndexSpec(); - indexToPersist.getIndex().getMetadata().putAll(metadataElems); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), interval, new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - indexSpec, + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index cf57072fe1d..373c4975242 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.indexing; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Period; import org.junit.Assert; @@ -87,6 +88,7 @@ public class RealtimeTuningConfigTest Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -111,7 +113,9 @@ public class RealtimeTuningConfigTest + " \"mergeThreadPriority\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" - + " \"alertTimeout\": 70\n" + + " \"alertTimeout\": 70,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -128,7 +132,6 @@ public class RealtimeTuningConfigTest Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); Assert.assertEquals(70, config.getAlertTimeout()); - Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(100, config.getMaxPendingPersists()); @@ -137,5 +140,8 @@ public class RealtimeTuningConfigTest Assert.assertEquals(100, config.getPersistThreadPriority()); Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod()); Assert.assertEquals(true, config.isReportParseExceptions()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 15a650b38ce..ee79dc1f228 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -72,6 +72,7 @@ public class AppenderatorPlumberTest null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index af706dd5340..0d6cac57cca 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -160,6 +160,7 @@ public class AppenderatorTester implements AutoCloseable null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index e02ca6d657b..b92bd121042 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -144,6 +144,7 @@ public class DefaultOfflineAppenderatorFactoryTest null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 4052cc678f4..31790305808 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -206,6 +206,7 @@ public class RealtimePlumberSchoolTest null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index de55360a61d..c1a4066c20f 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -75,6 +75,7 @@ public class SinkTest null, null, null, + null, 0, 0, null, @@ -229,6 +230,7 @@ public class SinkTest null, null, null, + null, 0, 0, null, diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index f9159fbcadb..bb759b6ddf7 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -166,6 +166,7 @@ public class DruidJsonValidatorTest 1, NoneShardSpec.instance(), new IndexSpec(), + new IndexSpec(), null, 0, 0,