add config to optionally disable all compression in intermediate segment persists while ingestion (#7919)

* disable all compression in intermediate segment persists while ingestion

* more changes and build fix

* by default retain existing indexingSpec for intermediate persisted segments

* document indexSpecForIntermediatePersists index tuning config

* fix build issues

* update serde tests
This commit is contained in:
Himanshu 2019-07-10 12:22:24 -07:00 committed by GitHub
parent fcf56f2330
commit 14aec7fcec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 217 additions and 27 deletions

View File

@ -139,7 +139,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)|
|`reportParseExceptions`|Boolean|*DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|

View File

@ -135,7 +135,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence numbers if the next sequence number that it is trying to fetch is less than the earliest available sequence number for that particular shard. The sequence number will be reset to either the earliest or latest sequence number depending on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). This situation typically occurs when messages in Kinesis are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular shard will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|

View File

@ -192,7 +192,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)|
|useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)|
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)|
|useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)|

View File

@ -185,6 +185,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
@ -375,6 +376,14 @@ An example of the result is
"metricCompression": "lz4",
"longEncoding": "longs"
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"maxPendingPersists": 0,
"reportParseExceptions": false,
"pushTimeout": 0,
@ -555,6 +564,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShardSpecs` if you plan to append more data to the same time range in the future. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no|
|reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no|

View File

@ -179,6 +179,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
tuningConfig.getPartitionsSpec(),
tuningConfig.getShardSpecs(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),

View File

@ -41,6 +41,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@ -62,6 +63,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
@ -87,6 +89,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
dir,
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),
@ -112,6 +115,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +

View File

@ -85,6 +85,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
null,
null,
null,
null,
null
),
ioConfig,

View File

@ -53,6 +53,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@ -80,6 +81,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
@ -186,6 +188,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
getBasePersistDirectory(),
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),

View File

@ -2456,6 +2456,7 @@ public class KafkaIndexTaskTest
null,
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
@ -65,6 +66,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
}
@ -81,7 +83,9 @@ public class KafkaIndexTaskTuningConfigTest
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ "}";
KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
@ -103,6 +107,8 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists());
}
@Test
@ -117,6 +123,7 @@ public class KafkaIndexTaskTuningConfigTest
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
@ -159,6 +166,7 @@ public class KafkaIndexTaskTuningConfigTest
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
@ -206,6 +214,7 @@ public class KafkaIndexTaskTuningConfigTest
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
@ -252,6 +261,7 @@ public class KafkaIndexTaskTuningConfigTest
config.getBasePersistDirectory(),
0,
config.getIndexSpec(),
config.getIndexSpecForIntermediatePersists(),
true,
config.isReportParseExceptions(),
config.getHandoffConditionTimeout(),

View File

@ -213,6 +213,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new File("/test"),
null,
null,
null,
true,
false,
null,
@ -3051,6 +3052,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new File("/test"),
null,
null,
null,
true,
false,
null,

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -63,6 +64,7 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
@ -90,7 +92,9 @@ public class KafkaSupervisorTuningConfigTest
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"offsetFetchPeriod\": \"PT20S\"\n"
+ " \"offsetFetchPeriod\": \"PT20S\",\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ "}";
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
@ -116,6 +120,8 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout());
Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout());
Assert.assertEquals(Duration.standardSeconds(20), config.getOffsetFetchPeriod());
Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists());
}
}

View File

@ -45,6 +45,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@ -67,6 +68,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,

View File

@ -58,6 +58,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@ -85,6 +86,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
@ -160,6 +162,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
dir,
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),

View File

@ -96,6 +96,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
null,
null,
null,
null,
null
),
ioConfig,

View File

@ -49,6 +49,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@ -81,6 +82,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout,
@ -191,6 +193,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
getBasePersistDirectory(),
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),

View File

@ -2629,6 +2629,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
null,
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,

View File

@ -144,6 +144,7 @@ public class KinesisIndexTaskTuningConfigTest
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
@ -202,6 +203,7 @@ public class KinesisIndexTaskTuningConfigTest
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
@ -288,6 +290,7 @@ public class KinesisIndexTaskTuningConfigTest
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,

View File

@ -168,6 +168,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new File("/test"),
null,
null,
null,
true,
false,
null,
@ -3678,6 +3679,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new File("/test"),
null,
null,
null,
true,
false,
null,

View File

@ -45,6 +45,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@ -73,6 +74,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout,
@ -104,6 +106,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
base.getBasePersistDirectory(),
base.getMaxPendingPersists(),
base.getIndexSpec(),
base.getIndexSpecForIntermediatePersists(),
base.getBuildV9Directly(),
base.isReportParseExceptions(),
base.getHandoffConditionTimeout(),

View File

@ -299,6 +299,11 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getIndexSpec();
}
public IndexSpec getIndexSpecForIntermediatePersists()
{
return schema.getTuningConfig().getIndexSpecForIntermediatePersists();
}
public boolean isOverwriteFiles()
{
return schema.getTuningConfig().isOverwriteFiles();

View File

@ -55,6 +55,7 @@ public class HadoopTuningConfig implements TuningConfig
DEFAULT_PARTITIONS_SPEC,
DEFAULT_SHARD_SPECS,
DEFAULT_INDEX_SPEC,
DEFAULT_INDEX_SPEC,
DEFAULT_ROW_FLUSH_BOUNDARY,
0L,
false,
@ -81,6 +82,7 @@ public class HadoopTuningConfig implements TuningConfig
private final PartitionsSpec partitionsSpec;
private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final int rowFlushBoundary;
private final long maxBytesInMemory;
private final boolean leaveIntermediate;
@ -105,6 +107,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<Long, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") IndexSpec indexSpec,
final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
@ -132,6 +135,8 @@ public class HadoopTuningConfig implements TuningConfig
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_ROW_FLUSH_BOUNDARY
: maxRowsInMemoryCOMPAT : maxRowsInMemory;
@ -199,6 +204,12 @@ public class HadoopTuningConfig implements TuningConfig
return indexSpec;
}
@JsonProperty
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
@JsonProperty("maxRowsInMemory")
public int getRowFlushBoundary()
{
@ -314,6 +325,7 @@ public class HadoopTuningConfig implements TuningConfig
partitionsSpec,
shardSpecs,
indexSpec,
indexSpecForIntermediatePersists,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
@ -343,6 +355,7 @@ public class HadoopTuningConfig implements TuningConfig
partitionsSpec,
shardSpecs,
indexSpec,
indexSpecForIntermediatePersists,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
@ -372,6 +385,7 @@ public class HadoopTuningConfig implements TuningConfig
partitionsSpec,
specs,
indexSpec,
indexSpecForIntermediatePersists,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,

View File

@ -605,7 +605,7 @@ public class IndexGeneratorJob implements Jobby
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
.persist(index, interval, file, config.getIndexSpec(), progressIndicator, null);
.persist(index, interval, file, config.getIndexSpecForIntermediatePersists(), progressIndicator, null);
}
protected File mergeQueryableIndex(

View File

@ -462,6 +462,7 @@ public class BatchDeltaIngestionTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -199,6 +199,7 @@ public class DetermineHashedPartitionsJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -262,6 +262,7 @@ public class DeterminePartitionsJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -86,6 +86,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
null,
false,
false,
false,
@ -164,6 +165,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -43,6 +43,7 @@ public class HadoopTuningConfigTest
null,
null,
null,
null,
100,
null,
true,
@ -70,6 +71,7 @@ public class HadoopTuningConfigTest
Assert.assertNotNull(actual.getPartitionsSpec());
Assert.assertEquals(ImmutableMap.<Long, List<HadoopyShardSpec>>of(), actual.getShardSpecs());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpec());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists());
Assert.assertEquals(100, actual.getRowFlushBoundary());
Assert.assertEquals(true, actual.isLeaveIntermediate());
Assert.assertEquals(true, actual.isCleanupOnFailure());

View File

@ -521,6 +521,7 @@ public class IndexGeneratorJobTest
null,
null,
null,
null,
maxRowsInMemory,
maxBytesInMemory,
true,

View File

@ -114,6 +114,7 @@ public class JobHelperTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -62,6 +62,7 @@ public class GranularityPathSpecTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -63,6 +63,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final boolean reportParseExceptions;
private final long publishAndHandoffTimeout;
private final long alertTimeout;
@ -84,6 +85,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
@JsonProperty("alertTimeout") Long alertTimeout,
@ -106,6 +108,8 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.reportParseExceptions = reportParseExceptions == null
? defaultReportParseExceptions
: reportParseExceptions;
@ -196,6 +200,13 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return indexSpec;
}
@JsonProperty
@Override
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
@Override
@JsonProperty
public boolean isReportParseExceptions()
@ -253,6 +264,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
maxPendingPersists,
shardSpec,
indexSpec,
indexSpecForIntermediatePersists,
reportParseExceptions,
publishAndHandoffTimeout,
alertTimeout,

View File

@ -236,7 +236,7 @@ public class YeOldePlumberSchool implements PlumberSchool
indexMergerV9.persist(
indexToPersist.getIndex(),
dirToPersist,
config.getIndexSpec(),
config.getIndexSpecForIntermediatePersists(),
config.getSegmentWriteOutMediumFactory()
);

View File

@ -1270,6 +1270,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
private final Integer numShards;
private final List<String> partitionDimensions;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final File basePersistDirectory;
private final int maxPendingPersists;
@ -1305,6 +1306,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@ -1327,6 +1329,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
numShards,
partitionDimensions,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
@ -1346,7 +1349,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
private IndexTuningConfig()
{
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private IndexTuningConfig(
@ -1357,6 +1360,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
@Nullable Integer numShards,
@Nullable List<String> partitionDimensions,
@Nullable IndexSpec indexSpec,
@Nullable IndexSpec indexSpecForIntermediatePersists,
@Nullable Integer maxPendingPersists,
@Nullable Boolean forceGuaranteedRollup,
@Nullable Boolean reportParseExceptions,
@ -1384,6 +1388,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup;
this.reportParseExceptions = reportParseExceptions == null
@ -1420,6 +1426,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
numShards,
partitionDimensions,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
@ -1442,6 +1449,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
numShards,
partitionDimensions,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
@ -1464,6 +1472,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
numShards,
partitionDimensions,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
@ -1533,6 +1542,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
return indexSpec;
}
@JsonProperty
@Override
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
@Override
public File getBasePersistDirectory()
{
@ -1618,19 +1634,22 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
IndexTuningConfig that = (IndexTuningConfig) o;
return maxRowsInMemory == that.maxRowsInMemory &&
Objects.equals(maxTotalRows, that.maxTotalRows) &&
maxBytesInMemory == that.maxBytesInMemory &&
maxPendingPersists == that.maxPendingPersists &&
forceGuaranteedRollup == that.forceGuaranteedRollup &&
reportParseExceptions == that.reportParseExceptions &&
pushTimeout == that.pushTimeout &&
Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
Objects.equals(numShards, that.numShards) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
logParseExceptions == that.logParseExceptions &&
maxParseExceptions == that.maxParseExceptions &&
maxSavedParseExceptions == that.maxSavedParseExceptions;
maxSavedParseExceptions == that.maxSavedParseExceptions &&
Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
Objects.equals(maxTotalRows, that.maxTotalRows) &&
Objects.equals(numShards, that.numShards) &&
Objects.equals(partitionDimensions, that.partitionDimensions) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory);
}
@Override
@ -1639,18 +1658,21 @@ public class IndexTask extends AbstractTask implements ChatHandler
return Objects.hash(
maxRowsPerSegment,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
numShards,
partitionDimensions,
indexSpec,
indexSpecForIntermediatePersists,
basePersistDirectory,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
pushTimeout,
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
segmentWriteOutMediumFactory
);
}
@ -1663,7 +1685,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
", maxBytesInMemory=" + maxBytesInMemory +
", maxTotalRows=" + maxTotalRows +
", numShards=" + numShards +
", partitionDimensions=" + partitionDimensions +
", indexSpec=" + indexSpec +
", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists +
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +
", forceGuaranteedRollup=" + forceGuaranteedRollup +

View File

@ -333,6 +333,7 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan
tuningConfig.getNumShards(),
null,
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getMaxPendingPersists(),
true,
false,

View File

@ -71,6 +71,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
null,
null,
null,
null,
null
);
}
@ -84,6 +85,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup,
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@ -109,6 +111,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
numShards,
null,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
null,
forceGuaranteedRollup,
@ -188,7 +191,6 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
@Override
public int hashCode()
{
return Objects.hash(
super.hashCode(),
maxNumSubTasks,

View File

@ -47,6 +47,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
@Deprecated
private final int maxPendingPersists;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;
@ -68,6 +69,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
@Nullable File basePersistDirectory,
@Nullable Integer maxPendingPersists,
@Nullable IndexSpec indexSpec,
@Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@Deprecated @Nullable Boolean reportParseExceptions,
@ -96,6 +98,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
this.basePersistDirectory = defaults.getBasePersistDirectory();
this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.reportParseExceptions = reportParseExceptions == null
? defaults.isReportParseExceptions()
: reportParseExceptions;
@ -187,6 +191,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
return indexSpec;
}
@JsonProperty
@Override
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@ -281,6 +292,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod);
}
@ -297,6 +309,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,

View File

@ -1416,6 +1416,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
null,
null,
null,
null,
reportParseExceptions,
handoffTimeout,
null,

View File

@ -288,6 +288,7 @@ public class CompactionTaskTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
true,
@ -462,6 +463,7 @@ public class CompactionTaskTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
true,
@ -522,6 +524,7 @@ public class CompactionTaskTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
true,
@ -582,6 +585,7 @@ public class CompactionTaskTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
true,
@ -846,6 +850,7 @@ public class CompactionTaskTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
true,
@ -1034,6 +1039,7 @@ public class CompactionTaskTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
true,

View File

@ -1004,6 +1004,7 @@ public class IndexTaskTest
null,
indexSpec,
null,
null,
true,
true,
false,
@ -1126,6 +1127,7 @@ public class IndexTaskTest
null,
indexSpec,
null,
null,
true,
false,
false,
@ -1241,6 +1243,7 @@ public class IndexTaskTest
null,
indexSpec,
null,
null,
true,
true,
false,
@ -1706,6 +1709,7 @@ public class IndexTaskTest
partitionDimensions,
indexSpec,
null,
null,
true,
forceGuaranteedRollup,
reportParseException,

View File

@ -835,6 +835,7 @@ public class RealtimeIndexTaskTest
null,
null,
null,
null,
true,
0,
0,

View File

@ -202,6 +202,7 @@ public class TaskSerdeTest
null,
null,
indexSpec,
null,
3,
true,
false,
@ -284,6 +285,7 @@ public class TaskSerdeTest
null,
null,
indexSpec,
null,
3,
true,
false,
@ -393,6 +395,7 @@ public class TaskSerdeTest
NoneShardSpec.instance(),
indexSpec,
null,
null,
0,
0,
true,

View File

@ -188,6 +188,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
null,
null,
null,
null,
numTotalSubTasks,
null,
null,

View File

@ -430,6 +430,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
null,
null,
null,
null,
NUM_SUB_TASKS,
null,
null,

View File

@ -138,6 +138,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
null,
null,
null,
null,
2,
null,
null,

View File

@ -250,6 +250,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
null,
null,
null,
null,
1,
null,
null,
@ -290,6 +291,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
null,
null,
null,
null,
2,
null,
null,

View File

@ -71,6 +71,7 @@ public class ParallelIndexTuningConfigTest
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
new IndexSpec(),
1,
false,
true,

View File

@ -690,6 +690,7 @@ public class TaskLifecycleTest
null,
null,
indexSpec,
null,
3,
true,
false,
@ -771,6 +772,7 @@ public class TaskLifecycleTest
null,
null,
indexSpec,
null,
3,
true,
false,
@ -1160,6 +1162,7 @@ public class TaskLifecycleTest
null,
indexSpec,
null,
null,
false,
null,
null,
@ -1290,6 +1293,7 @@ public class TaskLifecycleTest
null,
null,
null,
null,
0,
0,
null,

View File

@ -661,6 +661,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
null,
null
)
{

View File

@ -82,6 +82,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
defaultMaxPendingPersists,
defaultShardSpec,
defaultIndexSpec,
defaultIndexSpec,
true,
0,
0,
@ -103,6 +104,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final int persistThreadPriority;
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
@ -125,6 +127,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@ -152,6 +155,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.mergeThreadPriority = mergeThreadPriority;
this.persistThreadPriority = persistThreadPriority;
this.reportParseExceptions = reportParseExceptions == null
@ -233,6 +238,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
return indexSpec;
}
@JsonProperty
@Override
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@ -302,6 +314,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
maxPendingPersists,
shardSpec,
indexSpec,
indexSpecForIntermediatePersists,
true,
persistThreadPriority,
mergeThreadPriority,
@ -326,6 +339,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
maxPendingPersists,
shardSpec,
indexSpec,
indexSpecForIntermediatePersists,
true,
persistThreadPriority,
mergeThreadPriority,

View File

@ -67,6 +67,8 @@ public interface AppenderatorConfig
IndexSpec getIndexSpec();
IndexSpec getIndexSpecForIntermediatePersists();
File getBasePersistDirectory();
@Nullable

View File

@ -61,7 +61,6 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
@ -1260,12 +1259,11 @@ public class AppenderatorImpl implements Appenderator
final File persistedFile;
final File persistDir = createPersistDirIfNeeded(identifier);
final IndexSpec indexSpec = tuningConfig.getIndexSpec();
persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
identifier.getInterval(),
new File(persistDir, String.valueOf(indexToPersist.getCount())),
indexSpec,
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getSegmentWriteOutMediumFactory()
);

View File

@ -53,7 +53,6 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
@ -955,14 +954,12 @@ public class RealtimePlumber implements Plumber
try {
int numRows = indexToPersist.getIndex().size();
final IndexSpec indexSpec = config.getIndexSpec();
indexToPersist.getIndex().getMetadata().putAll(metadataElems);
final File persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
interval,
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
indexSpec,
config.getIndexSpecForIntermediatePersists(),
config.getSegmentWriteOutMediumFactory()
);

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Period;
import org.junit.Assert;
@ -87,6 +88,7 @@ public class RealtimeTuningConfigTest
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec());
Assert.assertEquals(0, config.getMaxPendingPersists());
@ -111,7 +113,9 @@ public class RealtimeTuningConfigTest
+ " \"mergeThreadPriority\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"alertTimeout\": 70\n"
+ " \"alertTimeout\": 70,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ "}";
ObjectMapper mapper = TestHelper.makeJsonMapper();
@ -128,7 +132,6 @@ public class RealtimeTuningConfigTest
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(70, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec());
Assert.assertEquals(100, config.getMaxPendingPersists());
@ -137,5 +140,8 @@ public class RealtimeTuningConfigTest
Assert.assertEquals(100, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists());
}
}

View File

@ -72,6 +72,7 @@ public class AppenderatorPlumberTest
null,
null,
null,
null,
true,
0,
0,

View File

@ -160,6 +160,7 @@ public class AppenderatorTester implements AutoCloseable
null,
null,
null,
null,
0,
0,
null,

View File

@ -144,6 +144,7 @@ public class DefaultOfflineAppenderatorFactoryTest
null,
null,
null,
null,
0,
0,
null,

View File

@ -206,6 +206,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
null,
true,
0,
0,

View File

@ -75,6 +75,7 @@ public class SinkTest
null,
null,
null,
null,
0,
0,
null,
@ -229,6 +230,7 @@ public class SinkTest
null,
null,
null,
null,
0,
0,
null,

View File

@ -166,6 +166,7 @@ public class DruidJsonValidatorTest
1,
NoneShardSpec.instance(),
new IndexSpec(),
new IndexSpec(),
null,
0,
0,